CORD-1289 Unit tests for event engine
Change-Id: I99a4b38c8593c842462c03026f6cdfe9045a3e8e
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
index 366025a..60cdcbb 100644
--- a/xos/synchronizers/new_base/event_engine.py
+++ b/xos/synchronizers/new_base/event_engine.py
@@ -38,9 +38,11 @@
self.bootstrap_servers = bootstrap_servers
self.daemon = True
- def run(self):
+ def create_kafka_consumer(self):
from kafka import KafkaConsumer
+ return KafkaConsumer(bootstrap_servers=self.bootstrap_servers)
+ def run(self):
if (not self.step.topics) and (not self.step.pattern):
raise Exception("Neither topics nor pattern is defined for step %s" % self.step.__name__)
@@ -50,7 +52,7 @@
while True:
try:
- self.consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers)
+ self.consumer = self.create_kafka_consumer()
if self.step.topics:
self.consumer.subscribe(topics=self.step.topics)
elif self.step.pattern:
diff --git a/xos/synchronizers/new_base/tests/event_steps/event_step.py b/xos/synchronizers/new_base/tests/event_steps/event_step.py
new file mode 100644
index 0000000..d83fbe7
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/event_steps/event_step.py
@@ -0,0 +1,28 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synchronizers.new_base.eventstep import EventStep
+from mock_modelaccessor import *
+
+class TestEventStep(EventStep):
+ technology = "kafka"
+ topics = ["sometopic"]
+ pattern = None
+
+ def __init__(self, log, *args, **kwargs):
+ super(TestEventStep, self).__init__(log, *args, **kwargs)
+
+ def process_event(self, event):
+ print "received an event", event
+
diff --git a/xos/synchronizers/new_base/tests/test_config.yaml b/xos/synchronizers/new_base/tests/test_config.yaml
index cd3dd64..2a01b6e 100644
--- a/xos/synchronizers/new_base/tests/test_config.yaml
+++ b/xos/synchronizers/new_base/tests/test_config.yaml
@@ -18,6 +18,9 @@
username: xosadmin@opencord.org
password: "sample"
kind: testframework
+event_bus:
+ endpoint: "fake"
+ kind: kafka
logging:
version: 1
handlers:
@@ -27,7 +30,8 @@
'':
handlers:
- console
- level: DEBUG
+ level: DEBUG
dependency_graph: "tests/model-deps"
steps_dir: "tests/steps"
pull_steps_dir: "tests/pull_steps"
+event_steps_dir: "tests/event_steps"
diff --git a/xos/synchronizers/new_base/tests/test_event_engine.py b/xos/synchronizers/new_base/tests/test_event_engine.py
new file mode 100644
index 0000000..1424643
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/test_event_engine.py
@@ -0,0 +1,227 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import functools
+import unittest
+from mock import patch, PropertyMock, ANY
+import mock
+import pdb
+
+import os, sys
+import time
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+xos_dir = os.path.join(test_path, '..', '..', '..')
+
+def config_get_mock(orig, overrides, key):
+ if key in overrides:
+ return overrides[key]
+ else:
+ return orig(key)
+
+class FakeKafkaConsumer():
+ def __init__(self, values=["sampleevent"]):
+ self.values = values
+
+ def subscribe(self, topics=None, pattern=None):
+ pass
+
+ def __iter__(self):
+ for x in self.values:
+ yield x
+ # block forever
+ time.sleep(1000)
+
+class TestEventEngine(unittest.TestCase):
+ def setUp(self):
+ global XOSKafkaThread, Config, event_engine_log
+
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+ sys.path.append(xos_dir)
+ sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
+ sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base', 'tests', 'event_steps'))
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+ Config.clear()
+ Config.init(config, 'synchronizer-config-schema.yaml')
+
+ from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
+ build_mock_modelaccessor(xos_dir, services_dir=None, service_xprotos=[])
+
+ os.chdir(os.path.join(test_path, '..')) # config references tests/model-deps
+
+ from event_engine import XOSKafkaThread, XOSEventEngine
+ from event_engine import log as event_engine_log
+
+ self.event_steps_dir = Config.get("event_steps_dir")
+ self.event_engine = XOSEventEngine()
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ def test_load_event_step_modules(self):
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+ self.assertEqual(len(self.event_engine.event_steps), 1)
+
+ def test_start(self):
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
+ patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \
+ patch.object(self.event_engine.event_steps[0], "process_event") as process_event:
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 1)
+
+ # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
+ time.sleep(0.1)
+
+ # We should have subscribed to the fake consumer
+ fake_subscribe.assert_called_with(topics=["sometopic"])
+
+ # The fake consumer will have returned one event, and that event will have been passed to our step
+ process_event.assert_called_with("sampleevent")
+
+ def test_start_with_pattern(self):
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
+ patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \
+ patch.object(self.event_engine.event_steps[0], "process_event") as process_event, \
+ patch.object(self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock) as pattern, \
+ patch.object(self.event_engine.event_steps[0], "topics", new_callable=PropertyMock) as topics:
+
+ pattern.return_value = "somepattern"
+ topics.return_value = []
+
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 1)
+
+ # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
+ time.sleep(0.1)
+
+ # We should have subscribed to the fake consumer
+ fake_subscribe.assert_called_with(pattern="somepattern")
+
+ # The fake consumer will have returned one event, and that event will have been passed to our step
+ process_event.assert_called_with("sampleevent")
+
+ def test_start_bad_tech(self):
+ """ Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and
+ not create any threads.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
+ patch.object(event_engine_log, "error") as log_error, \
+ patch.object(self.event_engine.event_steps[0], "technology") as technology:
+ technology.return_value = "not_kafka"
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 0)
+
+ log_error.assert_called_with('Unknown technology. Skipping step', step="TestEventStep",
+ technology=ANY)
+
+ def test_start_bad_no_topics(self):
+ """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
+ with an exception before calling subscribe.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
+ patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \
+ patch.object(self.event_engine.event_steps[0], "topics", new_callable=PropertyMock) as topics:
+ topics.return_value = []
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ # the thread does get launched, but it will fail with an exception
+ self.assertEqual(len(self.event_engine.threads), 1)
+
+ time.sleep(0.1)
+
+ fake_subscribe.assert_not_called()
+
+ def test_start_bad_topics_and_pattern(self):
+ """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
+ with an exception before calling subscribe.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
+ patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \
+ patch.object(self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock) as pattern:
+ pattern.return_value = "foo"
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ # the thread does get launched, but it will fail with an exception
+ self.assertEqual(len(self.event_engine.threads), 1)
+
+ time.sleep(0.1)
+
+ fake_subscribe.assert_not_called()
+
+ def test_start_config_no_eventbus_kind(self):
+ """ Set a blank event_bus.kind in Config. XOSEventEngine.start() should print an error message and
+ not create any threads.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ config_get_orig = Config.get
+ with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
+ patch.object(event_engine_log, "error") as log_error, \
+ patch.object(Config, "get", new=functools.partial(config_get_mock, config_get_orig, {"event_bus.kind": None})):
+
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 0)
+
+ log_error.assert_called_with('Eventbus kind is not configured in synchronizer config file.')
+
+ def test_start_config_bad_eventbus_kind(self):
+ """ Set an unknown event_bus.kind in Config. XOSEventEngine.start() should print an error message and
+ not create any threads.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ config_get_orig = Config.get
+ with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
+ patch.object(event_engine_log, "error") as log_error, \
+ patch.object(Config, "get",
+ new=functools.partial(config_get_mock, config_get_orig, {"event_bus.kind": "not_kafka"})):
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 0)
+
+ log_error.assert_called_with('Eventbus kind is set to a technology we do not implement.',
+ eventbus_kind='not_kafka')
+
+if __name__ == '__main__':
+ unittest.main()