SEBA-405 Convert synchronizer framework to library

Change-Id: If8562f23dc15c7d18d7a8b040b33756708b3c5ec
diff --git a/lib/xos-synchronizer/tests/test_event_engine.py b/lib/xos-synchronizer/tests/test_event_engine.py
new file mode 100644
index 0000000..13972c6
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_event_engine.py
@@ -0,0 +1,345 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import confluent_kafka
+import functools
+import unittest
+
+from mock import patch, PropertyMock, ANY
+
+import os
+import sys
+import time
+
+log = None
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+
+print os.getcwd()
+
+def config_get_mock(orig, overrides, key):
+    if key in overrides:
+        return overrides[key]
+    else:
+        return orig(key)
+
+
+class FakeKafkaConsumer:
+    def __init__(self, values=[]):
+        self.values = values
+
+    def subscribe(self, topics):
+        pass
+
+    def poll(self, timeout=1.0):
+        if self.values:
+            return FakeKafkaMessage(self.values.pop())
+        # block forever
+        time.sleep(1000)
+
+
+class FakeKafkaMessage:
+    """ Works like Message in confluent_kafka
+        https://docs.confluent.io/current/clients/confluent-kafka-python/#message
+    """
+
+    def __init__(
+        self,
+        timestamp=None,
+        topic="faketopic",
+        key="fakekey",
+        value="fakevalue",
+        error=False,
+    ):
+
+        if timestamp is None:
+            self.fake_ts_type = confluent_kafka.TIMESTAMP_NOT_AVAILABLE
+            self.fake_timestamp = None
+        else:
+            self.fake_ts_type = confluent_kafka.TIMESTAMP_CREATE_TIME
+            self.fake_timestamp = timestamp
+
+        self.fake_topic = topic
+        self.fake_key = key
+        self.fake_value = value
+        self.fake_error = error
+
+    def error(self):
+        return self.fake_error
+
+    def timestamp(self):
+        return (self.fake_ts_type, self.fake_timestamp)
+
+    def topic(self):
+        return self.fake_topic
+
+    def key(self):
+        return self.fake_key
+
+    def value(self):
+        return self.fake_value
+
+
+class TestEventEngine(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+
+        global log
+
+        config = os.path.join(test_path, "test_config.yaml")
+        from xosconfig import Config
+
+        Config.clear()
+        Config.init(config, "synchronizer-config-schema.yaml")
+
+        if not log:
+            from multistructlog import create_logger
+
+            log = create_logger(Config().get("logging"))
+
+    def setUp(self):
+        global XOSKafkaThread, Config, log
+
+        self.sys_path_save = sys.path
+        self.cwd_save = os.getcwd()
+
+        config = os.path.join(test_path, "test_config.yaml")
+        from xosconfig import Config
+
+        Config.clear()
+        Config.init(config, "synchronizer-config-schema.yaml")
+
+        from xossynchronizer.mock_modelaccessor_build import (
+            build_mock_modelaccessor,
+        )
+
+        build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+
+        # The test config.yaml references files in `test/` so make sure we're in the parent directory of the
+        # test directory.
+        os.chdir(os.path.join(test_path, ".."))
+
+        from xossynchronizer.event_engine import XOSKafkaThread, XOSEventEngine
+
+        self.event_steps_dir = Config.get("event_steps_dir")
+        self.event_engine = XOSEventEngine(log)
+
+    def tearDown(self):
+        sys.path = self.sys_path_save
+        os.chdir(self.cwd_save)
+
+    def test_load_event_step_modules(self):
+        self.event_engine.load_event_step_modules(self.event_steps_dir)
+        self.assertEqual(len(self.event_engine.event_steps), 1)
+
+    def test_start(self):
+        self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+        with patch.object(
+            XOSKafkaThread, "create_kafka_consumer"
+        ) as create_kafka_consumer, patch.object(
+            FakeKafkaConsumer, "subscribe"
+        ) as fake_subscribe, patch.object(
+            self.event_engine.event_steps[0], "process_event"
+        ) as process_event:
+
+            create_kafka_consumer.return_value = FakeKafkaConsumer(
+                values=["sampleevent"]
+            )
+            self.event_engine.start()
+
+            self.assertEqual(len(self.event_engine.threads), 1)
+
+            # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
+            time.sleep(0.1)
+
+            # We should have subscribed to the fake consumer
+            fake_subscribe.assert_called_once()
+
+            # The fake consumer will have returned one event
+            process_event.assert_called_once()
+
+    def test_start_with_pattern(self):
+        self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+        with patch.object(
+            XOSKafkaThread, "create_kafka_consumer"
+        ) as create_kafka_consumer, patch.object(
+            FakeKafkaConsumer, "subscribe"
+        ) as fake_subscribe, patch.object(
+            self.event_engine.event_steps[0], "process_event"
+        ) as process_event, patch.object(
+            self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
+        ) as pattern, patch.object(
+            self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
+        ) as topics:
+
+            pattern.return_value = "somepattern"
+            topics.return_value = []
+
+            create_kafka_consumer.return_value = FakeKafkaConsumer(
+                values=["sampleevent"]
+            )
+            self.event_engine.start()
+
+            self.assertEqual(len(self.event_engine.threads), 1)
+
+            # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
+            time.sleep(0.1)
+
+            # We should have subscribed to the fake consumer
+            fake_subscribe.assert_called_with("somepattern")
+
+            # The fake consumer will have returned one event
+            process_event.assert_called_once()
+
+    def test_start_bad_tech(self):
+        """ Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and
+            not create any threads.
+        """
+
+        self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+        with patch.object(
+            XOSKafkaThread, "create_kafka_consumer"
+        ) as create_kafka_consumer, patch.object(
+            log, "error"
+        ) as log_error, patch.object(
+            self.event_engine.event_steps[0], "technology"
+        ) as technology:
+            technology.return_value = "not_kafka"
+            create_kafka_consumer.return_value = FakeKafkaConsumer()
+            self.event_engine.start()
+
+            self.assertEqual(len(self.event_engine.threads), 0)
+
+            log_error.assert_called_with(
+                "Unknown technology. Skipping step",
+                step="TestEventStep",
+                technology=ANY,
+            )
+
+    def test_start_bad_no_topics(self):
+        """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
+            with an exception before calling subscribe.
+        """
+
+        self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+        with patch.object(
+            XOSKafkaThread, "create_kafka_consumer"
+        ) as create_kafka_consumer, patch.object(
+            FakeKafkaConsumer, "subscribe"
+        ) as fake_subscribe, patch.object(
+            self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
+        ) as topics:
+            topics.return_value = []
+            create_kafka_consumer.return_value = FakeKafkaConsumer()
+            self.event_engine.start()
+
+            # the thread does get launched, but it will fail with an exception
+            self.assertEqual(len(self.event_engine.threads), 1)
+
+            time.sleep(0.1)
+
+            fake_subscribe.assert_not_called()
+
+    def test_start_bad_topics_and_pattern(self):
+        """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
+            with an exception before calling subscribe.
+        """
+
+        self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+        with patch.object(
+            XOSKafkaThread, "create_kafka_consumer"
+        ) as create_kafka_consumer, patch.object(
+            FakeKafkaConsumer, "subscribe"
+        ) as fake_subscribe, patch.object(
+            self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
+        ) as pattern:
+            pattern.return_value = "foo"
+            create_kafka_consumer.return_value = FakeKafkaConsumer()
+            self.event_engine.start()
+
+            # the thread does get launched, but it will fail with an exception
+            self.assertEqual(len(self.event_engine.threads), 1)
+
+            time.sleep(0.1)
+
+            fake_subscribe.assert_not_called()
+
+    def test_start_config_no_eventbus_kind(self):
+        """ Set a blank event_bus.kind in Config. XOSEventEngine.start() should print an error message and
+            not create any threads.
+        """
+
+        self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+        config_get_orig = Config.get
+        with patch.object(
+            XOSKafkaThread, "create_kafka_consumer"
+        ) as create_kafka_consumer, patch.object(
+            log, "error"
+        ) as log_error, patch.object(
+            Config,
+            "get",
+            new=functools.partial(
+                config_get_mock, config_get_orig, {"event_bus.kind": None}
+            ),
+        ):
+
+            create_kafka_consumer.return_value = FakeKafkaConsumer()
+            self.event_engine.start()
+
+            self.assertEqual(len(self.event_engine.threads), 0)
+
+            log_error.assert_called_with(
+                "Eventbus kind is not configured in synchronizer config file."
+            )
+
+    def test_start_config_bad_eventbus_kind(self):
+        """ Set an unknown event_bus.kind in Config. XOSEventEngine.start() should print an error message and
+            not create any threads.
+        """
+
+        self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+        config_get_orig = Config.get
+        with patch.object(
+            XOSKafkaThread, "create_kafka_consumer"
+        ) as create_kafka_consumer, patch.object(
+            log, "error"
+        ) as log_error, patch.object(
+            Config,
+            "get",
+            new=functools.partial(
+                config_get_mock, config_get_orig, {"event_bus.kind": "not_kafka"}
+            ),
+        ):
+            create_kafka_consumer.return_value = FakeKafkaConsumer()
+            self.event_engine.start()
+
+            self.assertEqual(len(self.event_engine.threads), 0)
+
+            log_error.assert_called_with(
+                "Eventbus kind is set to a technology we do not implement.",
+                eventbus_kind="not_kafka",
+            )
+
+
+if __name__ == "__main__":
+    unittest.main()