CORD-1289 Implement Event Engine
Change-Id: I606d4a806bcd54fb9207f320f9338c339602bb40
diff --git a/containers/xos/pip_requested.txt b/containers/xos/pip_requested.txt
index ddb2e1a..7dbed2f 100644
--- a/containers/xos/pip_requested.txt
+++ b/containers/xos/pip_requested.txt
@@ -15,6 +15,7 @@
google-api-python-client==1.6.5
grpcio-tools==1.9.1
grpcio==1.9.1
+kafka==1.3.5
keystoneauth1==3.4.0
mock==2.0.0
multistructlog==1.5
diff --git a/containers/xos/pip_requirements.txt b/containers/xos/pip_requirements.txt
index 9ebf227..14f4087 100644
--- a/containers/xos/pip_requirements.txt
+++ b/containers/xos/pip_requirements.txt
@@ -58,6 +58,7 @@
jsonpatch==1.21
jsonpointer==2.0
jsonschema==2.6.0
+kafka==1.3.5
keystoneauth1==3.4.0
kombu==4.1.0
mock==2.0.0
diff --git a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
index bdaf938..df73a25 100644
--- a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
+++ b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
@@ -57,6 +57,8 @@
type: str
steps_dir:
type: str
+ event_steps_dir:
+ type: str
pull_steps_dir:
type: str
sys_dir:
@@ -76,6 +78,15 @@
kind:
type: str
required: False
+ event_bus:
+ type: map
+ required: False
+ map:
+ endpoint:
+ type: str
+ kind:
+ type: str
+ required: False
required_models:
type: seq
sequence:
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 24e7492..679fc5e 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -23,6 +23,7 @@
from synchronizers.new_base.syncstep import SyncStep
from synchronizers.new_base.event_loop import XOSObserver
from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
+from synchronizers.new_base.event_engine import XOSEventEngine
from synchronizers.new_base.modelaccessor import *
from xosconfig import Config
@@ -100,6 +101,7 @@
observer_thread = None
watcher_thread = None
model_policy_thread = None
+ event_engine = None
model_accessor.update_diag(sync_start=time.time(), backend_status="Synchronizer Start")
@@ -133,6 +135,14 @@
self.log.info(""
"Skipping observer and watcher threads due to no steps dir.")
+ event_steps_dir = Config.get("event_steps_dir")
+ if event_steps_dir:
+ event_engine = XOSEventEngine()
+ event_engine.load_event_step_modules(event_steps_dir)
+ event_engine.start()
+ else:
+ self.log.info("Skipping event engine due to no event_steps dir.")
+
# start model policies thread
policies_dir = Config.get("model_policies_dir")
if policies_dir:
@@ -142,8 +152,8 @@
else:
self.log.info("Skipping model policies thread due to no model_policies dir.")
- if (not observer_thread) and (not watcher_thread) and (not model_policy_thread):
- self.log.info("No sync steps and no policies. Synchronizer exiting.")
+ if (not observer_thread) and (not watcher_thread) and (not model_policy_thread) and (not event_engine):
+ self.log.info("No sync steps, no policies, and no event steps. Synchronizer exiting.")
# the caller will exit with status 0
return
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
new file mode 100644
index 0000000..366025a
--- /dev/null
+++ b/xos/synchronizers/new_base/event_engine.py
@@ -0,0 +1,132 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get('logging'))
+
+class XOSKafkaThread(threading.Thread):
+ """ XOSKafkaTrhead
+
+ A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
+ KafkaConsumer is launched to listen on the topics specified by the thread. The thread's process_event()
+ function is called for each event.
+ """
+
+ def __init__(self, step, bootstrap_servers, *args, **kwargs):
+ super(XOSKafkaThread, self).__init__(*args, **kwargs)
+ self.consumer = None
+ self.step = step
+ self.bootstrap_servers = bootstrap_servers
+ self.daemon = True
+
+ def run(self):
+ from kafka import KafkaConsumer
+
+ if (not self.step.topics) and (not self.step.pattern):
+ raise Exception("Neither topics nor pattern is defined for step %s" % self.step.__name__)
+
+ if self.step.topics and self.step.pattern:
+ raise Exception("Both topics and pattern are defined for step %s. Choose one." %
+ self.step.__name__)
+
+ while True:
+ try:
+ self.consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers)
+ if self.step.topics:
+ self.consumer.subscribe(topics=self.step.topics)
+ elif self.step.pattern:
+ self.consumer.subscribe(pattern=self.step.pattern)
+
+ log.info("Waiting for events",
+ topic=self.step.topics,
+ pattern=self.step.pattern,
+ step=self.step.__name__)
+
+ for msg in self.consumer:
+ log.info("Processing event", msg=msg, step=self.step.__name__)
+ try:
+ self.step(log=log).process_event(msg)
+ except:
+ log.exception("Exception in event step", msg=msg, step=self.step.__name__)
+ except:
+ # Maybe Kafka has not started yet. Log the exception and try again in a second.
+ log.exception("Exception in kafka loop")
+ time.sleep(1)
+
+class XOSEventEngine:
+ """ XOSEventEngine
+
+ Subscribe to and handle processing of events. Two methods are defined:
+
+ load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+ descendant from EventStep.
+
+ start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
+ will be called before start().
+ """
+
+ def __init__(self):
+ self.event_steps = []
+ self.threads = []
+
+ def load_event_step_modules(self, event_step_dir):
+ self.event_steps = []
+ log.info("Loading event steps", pull_step_dir=event_step_dir)
+
+ # NOTE we'll load all the classes that inherit from EventStep
+ for fn in os.listdir(event_step_dir):
+ pathname = os.path.join(event_step_dir, fn)
+ if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py") and ("test" not in fn):
+ event_module = imp.load_source(fn[:-3], pathname)
+
+ for classname in dir(event_module):
+ c = getattr(event_module, classname, None)
+
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+ if 'EventStep' in base_names:
+ self.event_steps.append(c)
+ log.info("Loaded event steps", steps=self.event_steps)
+
+ def start(self):
+ eventbus_kind = Config.get("event_bus.kind")
+ eventbus_endpoint = Config.get("event_bus.endpoint")
+
+ if not eventbus_kind:
+ log.error("Eventbus kind is not configured in synchronizer config file.")
+ return
+
+ if eventbus_kind not in ["kafka"]:
+ log.error("Eventbus kind is set to a technology we do not implement.", eventbus_kind=eventbus_kind)
+ return
+
+ if not eventbus_endpoint:
+ log.error("Eventbus endpoint is not configured in synchronizer config file.")
+ return
+
+ for step in self.event_steps:
+ if step.technology == "kafka":
+ thread = XOSKafkaThread(step=step, bootstrap_servers=[eventbus_endpoint])
+ thread.start()
+ self.threads.append(thread)
+ else:
+ log.error("Unknown technology. Skipping step", technology=step.technology, step=step.__name__)
diff --git a/xos/synchronizers/new_base/event_manager.py b/xos/synchronizers/new_base/event_manager.py
deleted file mode 100644
index 54bba5b..0000000
--- a/xos/synchronizers/new_base/event_manager.py
+++ /dev/null
@@ -1,133 +0,0 @@
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# FIXME Appear that a lot of unused code sits in here
-
-import threading
-from xosconfig import Config
-import uuid
-import os
-import imp
-import inspect
-import base64
-import json
-import traceback
-
-# NOTE can we use a path relative to this file?
-XOS_DIR = "/opt/xos"
-
-# NOTE I believe fofum is not used anymore, can we remove this?
-if Config.get("fofum_disabled") is None:
- from fofum import Fofum
- fofum_enabled = True
-else:
- fofum_enabled = False
-
-random_client_id=None
-def get_random_client_id():
- global random_client_id
-
- if (random_client_id is None) and os.path.exists(XOS_DIR + "/random_client_id"):
- # try to use the last one we used, if we saved it
- try:
- random_client_id = open(XOS_DIR+"/random_client_id","r").readline().strip()
- print "get_random_client_id: loaded %s" % random_client_id
- except:
- print "get_random_client_id: failed to read " + XOS_DIR + "/random_client_id"
-
- if random_client_id is None:
- random_client_id = base64.urlsafe_b64encode(os.urandom(12))
- print "get_random_client_id: generated new id %s" % random_client_id
-
- # try to save it for later (XXX: could race with another client here)
- try:
- open(XOS_DIR + "/random_client_id","w").write("%s\n" % random_client_id)
- except:
- print "get_random_client_id: failed to write " + XOS_DIR + "/random_client_id"
-
- return random_client_id
-
-# decorator that marks dispatachable event methods
-def event(func):
- setattr(func, 'event', func.__name__)
- return func
-
-class EventHandler:
- # This code is currently not in use.
- def __init__(self):
- pass
-
- @staticmethod
- def get_events():
- events = []
- for name in dir(EventHandler):
- attribute = getattr(EventHandler, name)
- if hasattr(attribute, 'event'):
- events.append(getattr(attribute, 'event'))
- return events
-
- def dispatch(self, event, *args, **kwds):
- if hasattr(self, event):
- return getattr(self, event)(*args, **kwds)
-
-
-class EventSender:
- def __init__(self,user=None,clientid=None):
-
- user = Config.get("feefie.client_user")
-
- try:
- clid = Config.get("feefie.client_id")
- except:
- clid = get_random_client_id()
- print "EventSender: no feefie_client_id configured. Using random id %s" % clid
-
- if fofum_enabled:
- self.fofum = Fofum(user=user)
- self.fofum.make(clid)
-
- def fire(self,**kwargs):
- kwargs["uuid"] = str(uuid.uuid1())
- if fofum_enabled:
- self.fofum.fire(json.dumps(kwargs))
-
-class EventListener:
- def __init__(self,wake_up=None):
- self.handler = EventHandler()
- self.wake_up = wake_up
-
- def handle_event(self, payload):
- payload_dict = json.loads(payload)
-
- if (self.wake_up):
- self.wake_up()
-
- def run(self):
- # This is our unique client id, to be used when firing and receiving events
- # It needs to be generated once and placed in the config file
-
- user = Config.get("feefie.client_user")
-
- try:
- clid = Config.get("feefie.client_id")
- except:
- clid = get_random_client_id()
- print "EventListener: no feefie_client_id configured. Using random id %s" % clid
-
- if fofum_enabled:
- f = Fofum(user=user)
-
- listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event))
- listener_thread.start()
diff --git a/xos/synchronizers/new_base/eventstep.py b/xos/synchronizers/new_base/eventstep.py
new file mode 100644
index 0000000..80e89e6
--- /dev/null
+++ b/xos/synchronizers/new_base/eventstep.py
@@ -0,0 +1,39 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class EventStep(object):
+ """
+ All the event steps defined in each synchronizer needs to inherit from this class in order to be loaded
+
+ Each step should define a technology, and either a `topics` or a `pattern`. The meaning of `topics` and `pattern`
+ depend on the technology that is chosen.
+ """
+
+ technology = "kafka"
+ topics = []
+ pattern = None
+
+ def __init__(self, log, **kwargs):
+ """
+ Initialize a pull step. Override this function to include any initialization. Make sure to call the original
+ __init__() from your method.
+ """
+
+ # self.log can be used to emit logging messages.
+ self.log = log
+
+ def process_event(self, event):
+ # This method must be overridden in your class. Do not call the original method.
+
+ self.log.warning("There is no default process_event, please provide a process_event method", msg=event)
\ No newline at end of file