blob: 0455f0d6f9d7e77b212b87565bde3e6c291ae694 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import imp
import inspect
import os
import threading
import time
import confluent_kafka
from xosconfig import Config
class XOSKafkaMessage:
def __init__(self, consumer_msg):
self.topic = consumer_msg.topic()
self.key = consumer_msg.key()
self.value = consumer_msg.value()
self.timestamp = None
(ts_type, ts_val) = consumer_msg.timestamp()
if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
self.timestamp = ts_val
class XOSKafkaThread(threading.Thread):
""" XOSKafkaThread
A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
function is called for each event.
"""
def __init__(self, step, bootstrap_servers, model_accessor, log, *args, **kwargs):
super(XOSKafkaThread, self).__init__(*args, **kwargs)
self.consumer = None
self.step = step
self.bootstrap_servers = bootstrap_servers
self.model_accessor = model_accessor
self.log = log
self.daemon = True
def create_kafka_consumer(self):
# use the service name as the group id
consumer_config = {
"group.id": Config().get("name"),
"bootstrap.servers": ",".join(self.bootstrap_servers),
"default.topic.config": {"auto.offset.reset": "smallest"},
}
return confluent_kafka.Consumer(**consumer_config)
def run(self):
if (not self.step.topics) and (not self.step.pattern):
raise Exception(
"Neither topics nor pattern is defined for step %s" % self.step.__name__
)
if self.step.topics and self.step.pattern:
raise Exception(
"Both topics and pattern are defined for step %s. Choose one."
% self.step.__name__
)
self.log.info(
"Waiting for events",
topic=self.step.topics,
pattern=self.step.pattern,
step=self.step.__name__,
)
while True:
try:
# setup consumer or loop on failure
if self.consumer is None:
self.consumer = self.create_kafka_consumer()
if self.step.topics:
self.consumer.subscribe(self.step.topics)
elif self.step.pattern:
self.consumer.subscribe(self.step.pattern)
except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
self.log.warning(
"No brokers available on %s, %s" % (self.bootstrap_servers, e)
)
time.sleep(20)
continue
except confluent_kafka.KafkaError as e:
# Maybe Kafka has not started yet. Log the exception and try again in a second.
self.log.exception("Exception in kafka loop: %s" % e)
time.sleep(1)
continue
# wait until we get a message, if no message, loop again
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
self.log.debug(
"Reached end of kafka topic %s, partition: %s, offset: %d"
% (msg.topic(), msg.partition(), msg.offset())
)
else:
self.log.exception("Error in kafka message: %s" % msg.error())
else:
# wrap parsing the event in a class
event_msg = XOSKafkaMessage(msg)
self.log.info(
"Processing event", event_msg=event_msg, step=self.step.__name__
)
try:
self.step(
model_accessor=self.model_accessor, log=self.log
).process_event(event_msg)
except BaseException:
self.log.exception(
"Exception in event step",
event_msg=event_msg,
step=self.step.__name__,
)
class XOSEventEngine(object):
""" XOSEventEngine
Subscribe to and handle processing of events. Two methods are defined:
load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
descendant from EventStep.
start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
will be called before start().
"""
def __init__(self, model_accessor, log):
self.event_steps = []
self.threads = []
self.model_accessor = model_accessor
self.log = log
def load_event_step_modules(self, event_step_dir):
self.event_steps = []
self.log.info("Loading event steps", event_step_dir=event_step_dir)
# NOTE we'll load all the classes that inherit from EventStep
for fn in os.listdir(event_step_dir):
pathname = os.path.join(event_step_dir, fn)
if (
os.path.isfile(pathname)
and fn.endswith(".py")
and (fn != "__init__.py")
and ("test" not in fn)
):
event_module = imp.load_source(fn[:-3], pathname)
for classname in dir(event_module):
c = getattr(event_module, classname, None)
if inspect.isclass(c):
base_names = [b.__name__ for b in c.__bases__]
if "EventStep" in base_names:
self.event_steps.append(c)
self.log.info("Loaded event steps", steps=self.event_steps)
def start(self):
eventbus_kind = Config.get("event_bus.kind")
eventbus_endpoint = Config.get("event_bus.endpoint")
if not eventbus_kind:
self.log.error(
"Eventbus kind is not configured in synchronizer config file."
)
return
if eventbus_kind not in ["kafka"]:
self.log.error(
"Eventbus kind is set to a technology we do not implement.",
eventbus_kind=eventbus_kind,
)
return
if not eventbus_endpoint:
self.log.error(
"Eventbus endpoint is not configured in synchronizer config file."
)
return
for step in self.event_steps:
if step.technology == "kafka":
thread = XOSKafkaThread(
step, [eventbus_endpoint], self.model_accessor, self.log
)
thread.start()
self.threads.append(thread)
else:
self.log.error(
"Unknown technology. Skipping step",
technology=step.technology,
step=step.__name__,
)