CORD-1289 Implement Event Engine

Change-Id: I606d4a806bcd54fb9207f320f9338c339602bb40
diff --git a/containers/xos/pip_requested.txt b/containers/xos/pip_requested.txt
index ddb2e1a..7dbed2f 100644
--- a/containers/xos/pip_requested.txt
+++ b/containers/xos/pip_requested.txt
@@ -15,6 +15,7 @@
 google-api-python-client==1.6.5
 grpcio-tools==1.9.1
 grpcio==1.9.1
+kafka==1.3.5
 keystoneauth1==3.4.0
 mock==2.0.0
 multistructlog==1.5
diff --git a/containers/xos/pip_requirements.txt b/containers/xos/pip_requirements.txt
index 9ebf227..14f4087 100644
--- a/containers/xos/pip_requirements.txt
+++ b/containers/xos/pip_requirements.txt
@@ -58,6 +58,7 @@
 jsonpatch==1.21
 jsonpointer==2.0
 jsonschema==2.6.0
+kafka==1.3.5
 keystoneauth1==3.4.0
 kombu==4.1.0
 mock==2.0.0
diff --git a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
index bdaf938..df73a25 100644
--- a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
+++ b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
@@ -57,6 +57,8 @@
     type: str
   steps_dir:
     type: str
+  event_steps_dir:
+    type: str
   pull_steps_dir:
     type: str
   sys_dir:
@@ -76,6 +78,15 @@
       kind:
         type: str
         required: False
+  event_bus:
+    type: map
+    required: False
+    map:
+      endpoint:
+        type: str
+      kind:
+        type: str
+        required: False
   required_models:
     type: seq
     sequence:
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 24e7492..679fc5e 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -23,6 +23,7 @@
 from synchronizers.new_base.syncstep import SyncStep
 from synchronizers.new_base.event_loop import XOSObserver
 from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
+from synchronizers.new_base.event_engine import XOSEventEngine
 from synchronizers.new_base.modelaccessor import *
 
 from xosconfig import Config
@@ -100,6 +101,7 @@
         observer_thread = None
         watcher_thread = None
         model_policy_thread = None
+        event_engine = None
 
         model_accessor.update_diag(sync_start=time.time(), backend_status="Synchronizer Start")
 
@@ -133,6 +135,14 @@
             self.log.info(""
                           "Skipping observer and watcher threads due to no steps dir.")
 
+        event_steps_dir = Config.get("event_steps_dir")
+        if event_steps_dir:
+            event_engine = XOSEventEngine()
+            event_engine.load_event_step_modules(event_steps_dir)
+            event_engine.start()
+        else:
+            self.log.info("Skipping event engine due to no event_steps dir.")
+
         # start model policies thread
         policies_dir = Config.get("model_policies_dir")
         if policies_dir:
@@ -142,8 +152,8 @@
         else:
             self.log.info("Skipping model policies thread due to no model_policies dir.")
 
-        if (not observer_thread) and (not watcher_thread) and (not model_policy_thread):
-            self.log.info("No sync steps and no policies. Synchronizer exiting.")
+        if (not observer_thread) and (not watcher_thread) and (not model_policy_thread) and (not event_engine):
+            self.log.info("No sync steps, no policies, and no event steps. Synchronizer exiting.")
             # the caller will exit with status 0
             return
 
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
new file mode 100644
index 0000000..366025a
--- /dev/null
+++ b/xos/synchronizers/new_base/event_engine.py
@@ -0,0 +1,132 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get('logging'))
+
+class XOSKafkaThread(threading.Thread):
+    """ XOSKafkaTrhead
+
+        A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
+        KafkaConsumer is launched to listen on the topics specified by the thread. The thread's process_event()
+        function is called for each event.
+    """
+
+    def __init__(self, step, bootstrap_servers, *args, **kwargs):
+        super(XOSKafkaThread, self).__init__(*args, **kwargs)
+        self.consumer = None
+        self.step = step
+        self.bootstrap_servers = bootstrap_servers
+        self.daemon = True
+
+    def run(self):
+        from kafka import KafkaConsumer
+
+        if (not self.step.topics) and (not self.step.pattern):
+            raise Exception("Neither topics nor pattern is defined for step %s" % self.step.__name__)
+
+        if self.step.topics and self.step.pattern:
+            raise Exception("Both topics and pattern are defined for step %s. Choose one." %
+                            self.step.__name__)
+
+        while True:
+            try:
+                self.consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers)
+                if self.step.topics:
+                    self.consumer.subscribe(topics=self.step.topics)
+                elif self.step.pattern:
+                    self.consumer.subscribe(pattern=self.step.pattern)
+
+                log.info("Waiting for events",
+                         topic=self.step.topics,
+                         pattern=self.step.pattern,
+                         step=self.step.__name__)
+
+                for msg in self.consumer:
+                    log.info("Processing event", msg=msg, step=self.step.__name__)
+                    try:
+                        self.step(log=log).process_event(msg)
+                    except:
+                        log.exception("Exception in event step", msg=msg, step=self.step.__name__)
+            except:
+                # Maybe Kafka has not started yet. Log the exception and try again in a second.
+                log.exception("Exception in kafka loop")
+                time.sleep(1)
+
+class XOSEventEngine:
+    """ XOSEventEngine
+
+        Subscribe to and handle processing of events. Two methods are defined:
+
+            load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+                                       descendant from EventStep.
+
+            start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
+                        will be called before start().
+    """
+
+    def __init__(self):
+        self.event_steps = []
+        self.threads = []
+
+    def load_event_step_modules(self, event_step_dir):
+        self.event_steps = []
+        log.info("Loading event steps", pull_step_dir=event_step_dir)
+
+        # NOTE we'll load all the classes that inherit from EventStep
+        for fn in os.listdir(event_step_dir):
+            pathname = os.path.join(event_step_dir, fn)
+            if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py") and ("test" not in fn):
+                event_module = imp.load_source(fn[:-3], pathname)
+
+                for classname in dir(event_module):
+                    c = getattr(event_module, classname, None)
+
+                    if inspect.isclass(c):
+                        base_names = [b.__name__ for b in c.__bases__]
+                        if 'EventStep' in base_names:
+                            self.event_steps.append(c)
+        log.info("Loaded event steps", steps=self.event_steps)
+
+    def start(self):
+        eventbus_kind = Config.get("event_bus.kind")
+        eventbus_endpoint = Config.get("event_bus.endpoint")
+
+        if not eventbus_kind:
+            log.error("Eventbus kind is not configured in synchronizer config file.")
+            return
+
+        if eventbus_kind not in ["kafka"]:
+            log.error("Eventbus kind is set to a technology we do not implement.", eventbus_kind=eventbus_kind)
+            return
+
+        if not eventbus_endpoint:
+            log.error("Eventbus endpoint is not configured in synchronizer config file.")
+            return
+
+        for step in self.event_steps:
+            if step.technology == "kafka":
+                thread = XOSKafkaThread(step=step, bootstrap_servers=[eventbus_endpoint])
+                thread.start()
+                self.threads.append(thread)
+            else:
+                log.error("Unknown technology. Skipping step", technology=step.technology, step=step.__name__)
diff --git a/xos/synchronizers/new_base/event_manager.py b/xos/synchronizers/new_base/event_manager.py
deleted file mode 100644
index 54bba5b..0000000
--- a/xos/synchronizers/new_base/event_manager.py
+++ /dev/null
@@ -1,133 +0,0 @@
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# FIXME Appear that a lot of unused code sits in here
-
-import threading
-from xosconfig import Config
-import uuid
-import os
-import imp
-import inspect
-import base64
-import json
-import traceback
-
-# NOTE can we use a path relative to this file?
-XOS_DIR = "/opt/xos"
-
-# NOTE I believe fofum is not used anymore, can we remove this?
-if Config.get("fofum_disabled") is None:
-    from fofum import Fofum
-    fofum_enabled = True
-else:
-    fofum_enabled = False
-
-random_client_id=None
-def get_random_client_id():
-    global random_client_id
-
-    if (random_client_id is None) and os.path.exists(XOS_DIR + "/random_client_id"):
-        # try to use the last one we used, if we saved it
-        try:
-            random_client_id = open(XOS_DIR+"/random_client_id","r").readline().strip()
-            print "get_random_client_id: loaded %s" % random_client_id
-        except:
-            print "get_random_client_id: failed to read " + XOS_DIR + "/random_client_id"
-
-    if random_client_id is None:
-        random_client_id = base64.urlsafe_b64encode(os.urandom(12))
-        print "get_random_client_id: generated new id %s" % random_client_id
-
-        # try to save it for later (XXX: could race with another client here)
-        try:
-            open(XOS_DIR + "/random_client_id","w").write("%s\n" % random_client_id)
-        except:
-            print "get_random_client_id: failed to write " + XOS_DIR + "/random_client_id"
-
-    return random_client_id
-
-# decorator that marks dispatachable event methods
-def event(func):
-    setattr(func, 'event', func.__name__)
-    return func
-
-class EventHandler:
-    # This code is currently not in use.
-    def __init__(self):
-        pass
-
-    @staticmethod
-    def get_events():
-        events = []
-        for name in dir(EventHandler):
-            attribute = getattr(EventHandler, name)
-            if hasattr(attribute, 'event'):
-                events.append(getattr(attribute, 'event'))
-        return events
-
-    def dispatch(self, event, *args, **kwds):
-        if hasattr(self, event):
-            return getattr(self, event)(*args, **kwds)
-            
-
-class EventSender:
-    def __init__(self,user=None,clientid=None):
-
-        user = Config.get("feefie.client_user")
-
-        try:
-            clid = Config.get("feefie.client_id")
-        except:
-            clid = get_random_client_id()
-            print "EventSender: no feefie_client_id configured. Using random id %s" % clid
-
-        if fofum_enabled:
-            self.fofum = Fofum(user=user)
-            self.fofum.make(clid)
-
-    def fire(self,**kwargs):
-        kwargs["uuid"] = str(uuid.uuid1())
-        if fofum_enabled:
-            self.fofum.fire(json.dumps(kwargs))
-
-class EventListener:
-    def __init__(self,wake_up=None):
-        self.handler = EventHandler()
-        self.wake_up = wake_up
-
-    def handle_event(self, payload):
-        payload_dict = json.loads(payload)
-
-        if (self.wake_up):
-            self.wake_up()
-
-    def run(self):
-        # This is our unique client id, to be used when firing and receiving events
-        # It needs to be generated once and placed in the config file
-
-        user = Config.get("feefie.client_user")
-
-        try:
-            clid = Config.get("feefie.client_id")
-        except:
-            clid = get_random_client_id()
-            print "EventListener: no feefie_client_id configured. Using random id %s" % clid
-
-        if fofum_enabled:
-            f = Fofum(user=user)
-
-            listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event))
-            listener_thread.start()
diff --git a/xos/synchronizers/new_base/eventstep.py b/xos/synchronizers/new_base/eventstep.py
new file mode 100644
index 0000000..80e89e6
--- /dev/null
+++ b/xos/synchronizers/new_base/eventstep.py
@@ -0,0 +1,39 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class EventStep(object):
+    """
+    All the event steps defined in each synchronizer needs to inherit from this class in order to be loaded
+
+    Each step should define a technology, and either a `topics` or a `pattern`. The meaning of `topics` and `pattern`
+    depend on the technology that is chosen.
+    """
+
+    technology = "kafka"
+    topics = []
+    pattern = None
+
+    def __init__(self, log, **kwargs):
+        """
+        Initialize a pull step. Override this function to include any initialization. Make sure to call the original
+        __init__() from your method.
+        """
+
+        # self.log can be used to emit logging messages.
+        self.log = log
+
+    def process_event(self, event):
+        # This method must be overridden in your class. Do not call the original method.
+
+        self.log.warning("There is no default process_event, please provide a process_event method", msg=event)
\ No newline at end of file