SEBA-405 Convert synchronizer framework to library
Change-Id: If8562f23dc15c7d18d7a8b040b33756708b3c5ec
diff --git a/lib/xos-synchronizer/xossynchronizer/event_engine.py b/lib/xos-synchronizer/xossynchronizer/event_engine.py
new file mode 100644
index 0000000..e5e18d1
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/event_engine.py
@@ -0,0 +1,216 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import confluent_kafka
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+
+
+class XOSKafkaMessage:
+ def __init__(self, consumer_msg):
+
+ self.topic = consumer_msg.topic()
+ self.key = consumer_msg.key()
+ self.value = consumer_msg.value()
+
+ self.timestamp = None
+ (ts_type, ts_val) = consumer_msg.timestamp()
+
+ if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
+ self.timestamp = ts_val
+
+
+class XOSKafkaThread(threading.Thread):
+ """ XOSKafkaThread
+
+ A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
+ Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
+ function is called for each event.
+ """
+
+ def __init__(self, step, bootstrap_servers, log, *args, **kwargs):
+ super(XOSKafkaThread, self).__init__(*args, **kwargs)
+ self.consumer = None
+ self.step = step
+ self.bootstrap_servers = bootstrap_servers
+ self.log = log
+ self.daemon = True
+
+ def create_kafka_consumer(self):
+ # use the service name as the group id
+ consumer_config = {
+ "group.id": Config().get("name"),
+ "bootstrap.servers": ",".join(self.bootstrap_servers),
+ "default.topic.config": {"auto.offset.reset": "smallest"},
+ }
+
+ return confluent_kafka.Consumer(**consumer_config)
+
+ def run(self):
+ if (not self.step.topics) and (not self.step.pattern):
+ raise Exception(
+ "Neither topics nor pattern is defined for step %s" % self.step.__name__
+ )
+
+ if self.step.topics and self.step.pattern:
+ raise Exception(
+ "Both topics and pattern are defined for step %s. Choose one."
+ % self.step.__name__
+ )
+
+ self.log.info(
+ "Waiting for events",
+ topic=self.step.topics,
+ pattern=self.step.pattern,
+ step=self.step.__name__,
+ )
+
+ while True:
+ try:
+ # setup consumer or loop on failure
+ if self.consumer is None:
+ self.consumer = self.create_kafka_consumer()
+
+ if self.step.topics:
+ self.consumer.subscribe(self.step.topics)
+
+ elif self.step.pattern:
+ self.consumer.subscribe(self.step.pattern)
+
+ except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
+ self.log.warning(
+ "No brokers available on %s, %s" % (self.bootstrap_servers, e)
+ )
+ time.sleep(20)
+ continue
+
+ except confluent_kafka.KafkaError as e:
+ # Maybe Kafka has not started yet. Log the exception and try again in a second.
+ self.log.exception("Exception in kafka loop: %s" % e)
+ time.sleep(1)
+ continue
+
+ # wait until we get a message, if no message, loop again
+ msg = self.consumer.poll(timeout=1.0)
+
+ if msg is None:
+ continue
+
+ if msg.error():
+ if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
+ self.log.debug(
+ "Reached end of kafka topic %s, partition: %s, offset: %d"
+ % (msg.topic(), msg.partition(), msg.offset())
+ )
+ else:
+ self.log.exception("Error in kafka message: %s" % msg.error())
+
+ else:
+ # wrap parsing the event in a class
+ event_msg = XOSKafkaMessage(msg)
+
+ self.log.info(
+ "Processing event", event_msg=event_msg, step=self.step.__name__
+ )
+
+ try:
+ self.step(log=self.log).process_event(event_msg)
+
+ except BaseException:
+ self.log.exception(
+ "Exception in event step",
+ event_msg=event_msg,
+ step=self.step.__name__,
+ )
+
+
+class XOSEventEngine(object):
+ """ XOSEventEngine
+
+ Subscribe to and handle processing of events. Two methods are defined:
+
+ load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+ descendant from EventStep.
+
+ start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
+ will be called before start().
+ """
+
+ def __init__(self, log):
+ self.event_steps = []
+ self.threads = []
+ self.log = log
+
+ def load_event_step_modules(self, event_step_dir):
+ self.event_steps = []
+ self.log.info("Loading event steps", event_step_dir=event_step_dir)
+
+ # NOTE we'll load all the classes that inherit from EventStep
+ for fn in os.listdir(event_step_dir):
+ pathname = os.path.join(event_step_dir, fn)
+ if (
+ os.path.isfile(pathname)
+ and fn.endswith(".py")
+ and (fn != "__init__.py")
+ and ("test" not in fn)
+ ):
+ event_module = imp.load_source(fn[:-3], pathname)
+
+ for classname in dir(event_module):
+ c = getattr(event_module, classname, None)
+
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+ if "EventStep" in base_names:
+ self.event_steps.append(c)
+ self.log.info("Loaded event steps", steps=self.event_steps)
+
+ def start(self):
+ eventbus_kind = Config.get("event_bus.kind")
+ eventbus_endpoint = Config.get("event_bus.endpoint")
+
+ if not eventbus_kind:
+ self.log.error(
+ "Eventbus kind is not configured in synchronizer config file."
+ )
+ return
+
+ if eventbus_kind not in ["kafka"]:
+ self.log.error(
+ "Eventbus kind is set to a technology we do not implement.",
+ eventbus_kind=eventbus_kind,
+ )
+ return
+
+ if not eventbus_endpoint:
+ self.log.error(
+ "Eventbus endpoint is not configured in synchronizer config file."
+ )
+ return
+
+ for step in self.event_steps:
+ if step.technology == "kafka":
+ thread = XOSKafkaThread(step, [eventbus_endpoint], self.log)
+ thread.start()
+ self.threads.append(thread)
+ else:
+ self.log.error(
+ "Unknown technology. Skipping step",
+ technology=step.technology,
+ step=step.__name__,
+ )