SEBA-405 Convert synchronizer framework to library

Change-Id: If8562f23dc15c7d18d7a8b040b33756708b3c5ec
diff --git a/lib/xos-synchronizer/xossynchronizer/event_engine.py b/lib/xos-synchronizer/xossynchronizer/event_engine.py
new file mode 100644
index 0000000..e5e18d1
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/event_engine.py
@@ -0,0 +1,216 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import confluent_kafka
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+
+
+class XOSKafkaMessage:
+    def __init__(self, consumer_msg):
+
+        self.topic = consumer_msg.topic()
+        self.key = consumer_msg.key()
+        self.value = consumer_msg.value()
+
+        self.timestamp = None
+        (ts_type, ts_val) = consumer_msg.timestamp()
+
+        if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
+            self.timestamp = ts_val
+
+
+class XOSKafkaThread(threading.Thread):
+    """ XOSKafkaThread
+
+        A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
+        Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
+        function is called for each event.
+    """
+
+    def __init__(self, step, bootstrap_servers, log, *args, **kwargs):
+        super(XOSKafkaThread, self).__init__(*args, **kwargs)
+        self.consumer = None
+        self.step = step
+        self.bootstrap_servers = bootstrap_servers
+        self.log = log
+        self.daemon = True
+
+    def create_kafka_consumer(self):
+        # use the service name as the group id
+        consumer_config = {
+            "group.id": Config().get("name"),
+            "bootstrap.servers": ",".join(self.bootstrap_servers),
+            "default.topic.config": {"auto.offset.reset": "smallest"},
+        }
+
+        return confluent_kafka.Consumer(**consumer_config)
+
+    def run(self):
+        if (not self.step.topics) and (not self.step.pattern):
+            raise Exception(
+                "Neither topics nor pattern is defined for step %s" % self.step.__name__
+            )
+
+        if self.step.topics and self.step.pattern:
+            raise Exception(
+                "Both topics and pattern are defined for step %s. Choose one."
+                % self.step.__name__
+            )
+
+        self.log.info(
+            "Waiting for events",
+            topic=self.step.topics,
+            pattern=self.step.pattern,
+            step=self.step.__name__,
+        )
+
+        while True:
+            try:
+                # setup consumer or loop on failure
+                if self.consumer is None:
+                    self.consumer = self.create_kafka_consumer()
+
+                    if self.step.topics:
+                        self.consumer.subscribe(self.step.topics)
+
+                    elif self.step.pattern:
+                        self.consumer.subscribe(self.step.pattern)
+
+            except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
+                self.log.warning(
+                    "No brokers available on %s, %s" % (self.bootstrap_servers, e)
+                )
+                time.sleep(20)
+                continue
+
+            except confluent_kafka.KafkaError as e:
+                # Maybe Kafka has not started yet. Log the exception and try again in a second.
+                self.log.exception("Exception in kafka loop: %s" % e)
+                time.sleep(1)
+                continue
+
+            # wait until we get a message, if no message, loop again
+            msg = self.consumer.poll(timeout=1.0)
+
+            if msg is None:
+                continue
+
+            if msg.error():
+                if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
+                    self.log.debug(
+                        "Reached end of kafka topic %s, partition: %s, offset: %d"
+                        % (msg.topic(), msg.partition(), msg.offset())
+                    )
+                else:
+                    self.log.exception("Error in kafka message: %s" % msg.error())
+
+            else:
+                # wrap parsing the event in a class
+                event_msg = XOSKafkaMessage(msg)
+
+                self.log.info(
+                    "Processing event", event_msg=event_msg, step=self.step.__name__
+                )
+
+                try:
+                    self.step(log=self.log).process_event(event_msg)
+
+                except BaseException:
+                    self.log.exception(
+                        "Exception in event step",
+                        event_msg=event_msg,
+                        step=self.step.__name__,
+                    )
+
+
+class XOSEventEngine(object):
+    """ XOSEventEngine
+
+        Subscribe to and handle processing of events. Two methods are defined:
+
+            load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+                                       descendant from EventStep.
+
+            start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
+                        will be called before start().
+    """
+
+    def __init__(self, log):
+        self.event_steps = []
+        self.threads = []
+        self.log = log
+
+    def load_event_step_modules(self, event_step_dir):
+        self.event_steps = []
+        self.log.info("Loading event steps", event_step_dir=event_step_dir)
+
+        # NOTE we'll load all the classes that inherit from EventStep
+        for fn in os.listdir(event_step_dir):
+            pathname = os.path.join(event_step_dir, fn)
+            if (
+                os.path.isfile(pathname)
+                and fn.endswith(".py")
+                and (fn != "__init__.py")
+                and ("test" not in fn)
+            ):
+                event_module = imp.load_source(fn[:-3], pathname)
+
+                for classname in dir(event_module):
+                    c = getattr(event_module, classname, None)
+
+                    if inspect.isclass(c):
+                        base_names = [b.__name__ for b in c.__bases__]
+                        if "EventStep" in base_names:
+                            self.event_steps.append(c)
+        self.log.info("Loaded event steps", steps=self.event_steps)
+
+    def start(self):
+        eventbus_kind = Config.get("event_bus.kind")
+        eventbus_endpoint = Config.get("event_bus.endpoint")
+
+        if not eventbus_kind:
+            self.log.error(
+                "Eventbus kind is not configured in synchronizer config file."
+            )
+            return
+
+        if eventbus_kind not in ["kafka"]:
+            self.log.error(
+                "Eventbus kind is set to a technology we do not implement.",
+                eventbus_kind=eventbus_kind,
+            )
+            return
+
+        if not eventbus_endpoint:
+            self.log.error(
+                "Eventbus endpoint is not configured in synchronizer config file."
+            )
+            return
+
+        for step in self.event_steps:
+            if step.technology == "kafka":
+                thread = XOSKafkaThread(step, [eventbus_endpoint], self.log)
+                thread.start()
+                self.threads.append(thread)
+            else:
+                self.log.error(
+                    "Unknown technology. Skipping step",
+                    technology=step.technology,
+                    step=step.__name__,
+                )