blob: 0455f0d6f9d7e77b212b87565bde3e6c291ae694 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
Zack Williams5c2ea232019-01-30 15:23:01 -070015from __future__ import absolute_import
16
Scott Bakerbba67b62019-01-28 17:38:21 -080017import imp
18import inspect
19import os
20import threading
21import time
Zack Williams5c2ea232019-01-30 15:23:01 -070022
23import confluent_kafka
24
Scott Bakerbba67b62019-01-28 17:38:21 -080025from xosconfig import Config
26
27
28class XOSKafkaMessage:
29 def __init__(self, consumer_msg):
30
31 self.topic = consumer_msg.topic()
32 self.key = consumer_msg.key()
33 self.value = consumer_msg.value()
34
35 self.timestamp = None
36 (ts_type, ts_val) = consumer_msg.timestamp()
37
38 if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
39 self.timestamp = ts_val
40
41
42class XOSKafkaThread(threading.Thread):
43 """ XOSKafkaThread
44
45 A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
46 Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
47 function is called for each event.
48 """
49
Scott Bakerc2fddaa2019-01-30 15:45:03 -080050 def __init__(self, step, bootstrap_servers, model_accessor, log, *args, **kwargs):
Scott Bakerbba67b62019-01-28 17:38:21 -080051 super(XOSKafkaThread, self).__init__(*args, **kwargs)
52 self.consumer = None
53 self.step = step
54 self.bootstrap_servers = bootstrap_servers
Scott Bakerc2fddaa2019-01-30 15:45:03 -080055 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080056 self.log = log
57 self.daemon = True
58
59 def create_kafka_consumer(self):
60 # use the service name as the group id
61 consumer_config = {
62 "group.id": Config().get("name"),
63 "bootstrap.servers": ",".join(self.bootstrap_servers),
64 "default.topic.config": {"auto.offset.reset": "smallest"},
65 }
66
67 return confluent_kafka.Consumer(**consumer_config)
68
69 def run(self):
70 if (not self.step.topics) and (not self.step.pattern):
71 raise Exception(
72 "Neither topics nor pattern is defined for step %s" % self.step.__name__
73 )
74
75 if self.step.topics and self.step.pattern:
76 raise Exception(
77 "Both topics and pattern are defined for step %s. Choose one."
78 % self.step.__name__
79 )
80
81 self.log.info(
82 "Waiting for events",
83 topic=self.step.topics,
84 pattern=self.step.pattern,
85 step=self.step.__name__,
86 )
87
88 while True:
89 try:
90 # setup consumer or loop on failure
91 if self.consumer is None:
92 self.consumer = self.create_kafka_consumer()
93
94 if self.step.topics:
95 self.consumer.subscribe(self.step.topics)
96
97 elif self.step.pattern:
98 self.consumer.subscribe(self.step.pattern)
99
100 except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
101 self.log.warning(
102 "No brokers available on %s, %s" % (self.bootstrap_servers, e)
103 )
104 time.sleep(20)
105 continue
106
107 except confluent_kafka.KafkaError as e:
108 # Maybe Kafka has not started yet. Log the exception and try again in a second.
109 self.log.exception("Exception in kafka loop: %s" % e)
110 time.sleep(1)
111 continue
112
113 # wait until we get a message, if no message, loop again
114 msg = self.consumer.poll(timeout=1.0)
115
116 if msg is None:
117 continue
118
119 if msg.error():
120 if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
121 self.log.debug(
122 "Reached end of kafka topic %s, partition: %s, offset: %d"
123 % (msg.topic(), msg.partition(), msg.offset())
124 )
125 else:
126 self.log.exception("Error in kafka message: %s" % msg.error())
127
128 else:
129 # wrap parsing the event in a class
130 event_msg = XOSKafkaMessage(msg)
131
132 self.log.info(
133 "Processing event", event_msg=event_msg, step=self.step.__name__
134 )
135
136 try:
Zack Williams5c2ea232019-01-30 15:23:01 -0700137 self.step(
138 model_accessor=self.model_accessor, log=self.log
139 ).process_event(event_msg)
Scott Bakerbba67b62019-01-28 17:38:21 -0800140
141 except BaseException:
142 self.log.exception(
143 "Exception in event step",
144 event_msg=event_msg,
145 step=self.step.__name__,
146 )
147
148
149class XOSEventEngine(object):
150 """ XOSEventEngine
151
152 Subscribe to and handle processing of events. Two methods are defined:
153
154 load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
155 descendant from EventStep.
156
157 start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
158 will be called before start().
159 """
160
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800161 def __init__(self, model_accessor, log):
Scott Bakerbba67b62019-01-28 17:38:21 -0800162 self.event_steps = []
163 self.threads = []
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800164 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -0800165 self.log = log
166
167 def load_event_step_modules(self, event_step_dir):
168 self.event_steps = []
169 self.log.info("Loading event steps", event_step_dir=event_step_dir)
170
171 # NOTE we'll load all the classes that inherit from EventStep
172 for fn in os.listdir(event_step_dir):
173 pathname = os.path.join(event_step_dir, fn)
174 if (
175 os.path.isfile(pathname)
176 and fn.endswith(".py")
177 and (fn != "__init__.py")
178 and ("test" not in fn)
179 ):
180 event_module = imp.load_source(fn[:-3], pathname)
181
182 for classname in dir(event_module):
183 c = getattr(event_module, classname, None)
184
185 if inspect.isclass(c):
186 base_names = [b.__name__ for b in c.__bases__]
187 if "EventStep" in base_names:
188 self.event_steps.append(c)
189 self.log.info("Loaded event steps", steps=self.event_steps)
190
191 def start(self):
192 eventbus_kind = Config.get("event_bus.kind")
193 eventbus_endpoint = Config.get("event_bus.endpoint")
194
195 if not eventbus_kind:
196 self.log.error(
197 "Eventbus kind is not configured in synchronizer config file."
198 )
199 return
200
201 if eventbus_kind not in ["kafka"]:
202 self.log.error(
203 "Eventbus kind is set to a technology we do not implement.",
204 eventbus_kind=eventbus_kind,
205 )
206 return
207
208 if not eventbus_endpoint:
209 self.log.error(
210 "Eventbus endpoint is not configured in synchronizer config file."
211 )
212 return
213
214 for step in self.event_steps:
215 if step.technology == "kafka":
Zack Williams5c2ea232019-01-30 15:23:01 -0700216 thread = XOSKafkaThread(
217 step, [eventbus_endpoint], self.model_accessor, self.log
218 )
Scott Bakerbba67b62019-01-28 17:38:21 -0800219 thread.start()
220 self.threads.append(thread)
221 else:
222 self.log.error(
223 "Unknown technology. Skipping step",
224 technology=step.technology,
225 step=step.__name__,
226 )