blob: e5e18d124470e39d3a1f5e6a780d34436e517937 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import confluent_kafka
16import imp
17import inspect
18import os
19import threading
20import time
21from xosconfig import Config
22
23
24class XOSKafkaMessage:
25 def __init__(self, consumer_msg):
26
27 self.topic = consumer_msg.topic()
28 self.key = consumer_msg.key()
29 self.value = consumer_msg.value()
30
31 self.timestamp = None
32 (ts_type, ts_val) = consumer_msg.timestamp()
33
34 if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
35 self.timestamp = ts_val
36
37
38class XOSKafkaThread(threading.Thread):
39 """ XOSKafkaThread
40
41 A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
42 Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
43 function is called for each event.
44 """
45
46 def __init__(self, step, bootstrap_servers, log, *args, **kwargs):
47 super(XOSKafkaThread, self).__init__(*args, **kwargs)
48 self.consumer = None
49 self.step = step
50 self.bootstrap_servers = bootstrap_servers
51 self.log = log
52 self.daemon = True
53
54 def create_kafka_consumer(self):
55 # use the service name as the group id
56 consumer_config = {
57 "group.id": Config().get("name"),
58 "bootstrap.servers": ",".join(self.bootstrap_servers),
59 "default.topic.config": {"auto.offset.reset": "smallest"},
60 }
61
62 return confluent_kafka.Consumer(**consumer_config)
63
64 def run(self):
65 if (not self.step.topics) and (not self.step.pattern):
66 raise Exception(
67 "Neither topics nor pattern is defined for step %s" % self.step.__name__
68 )
69
70 if self.step.topics and self.step.pattern:
71 raise Exception(
72 "Both topics and pattern are defined for step %s. Choose one."
73 % self.step.__name__
74 )
75
76 self.log.info(
77 "Waiting for events",
78 topic=self.step.topics,
79 pattern=self.step.pattern,
80 step=self.step.__name__,
81 )
82
83 while True:
84 try:
85 # setup consumer or loop on failure
86 if self.consumer is None:
87 self.consumer = self.create_kafka_consumer()
88
89 if self.step.topics:
90 self.consumer.subscribe(self.step.topics)
91
92 elif self.step.pattern:
93 self.consumer.subscribe(self.step.pattern)
94
95 except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
96 self.log.warning(
97 "No brokers available on %s, %s" % (self.bootstrap_servers, e)
98 )
99 time.sleep(20)
100 continue
101
102 except confluent_kafka.KafkaError as e:
103 # Maybe Kafka has not started yet. Log the exception and try again in a second.
104 self.log.exception("Exception in kafka loop: %s" % e)
105 time.sleep(1)
106 continue
107
108 # wait until we get a message, if no message, loop again
109 msg = self.consumer.poll(timeout=1.0)
110
111 if msg is None:
112 continue
113
114 if msg.error():
115 if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
116 self.log.debug(
117 "Reached end of kafka topic %s, partition: %s, offset: %d"
118 % (msg.topic(), msg.partition(), msg.offset())
119 )
120 else:
121 self.log.exception("Error in kafka message: %s" % msg.error())
122
123 else:
124 # wrap parsing the event in a class
125 event_msg = XOSKafkaMessage(msg)
126
127 self.log.info(
128 "Processing event", event_msg=event_msg, step=self.step.__name__
129 )
130
131 try:
132 self.step(log=self.log).process_event(event_msg)
133
134 except BaseException:
135 self.log.exception(
136 "Exception in event step",
137 event_msg=event_msg,
138 step=self.step.__name__,
139 )
140
141
142class XOSEventEngine(object):
143 """ XOSEventEngine
144
145 Subscribe to and handle processing of events. Two methods are defined:
146
147 load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
148 descendant from EventStep.
149
150 start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
151 will be called before start().
152 """
153
154 def __init__(self, log):
155 self.event_steps = []
156 self.threads = []
157 self.log = log
158
159 def load_event_step_modules(self, event_step_dir):
160 self.event_steps = []
161 self.log.info("Loading event steps", event_step_dir=event_step_dir)
162
163 # NOTE we'll load all the classes that inherit from EventStep
164 for fn in os.listdir(event_step_dir):
165 pathname = os.path.join(event_step_dir, fn)
166 if (
167 os.path.isfile(pathname)
168 and fn.endswith(".py")
169 and (fn != "__init__.py")
170 and ("test" not in fn)
171 ):
172 event_module = imp.load_source(fn[:-3], pathname)
173
174 for classname in dir(event_module):
175 c = getattr(event_module, classname, None)
176
177 if inspect.isclass(c):
178 base_names = [b.__name__ for b in c.__bases__]
179 if "EventStep" in base_names:
180 self.event_steps.append(c)
181 self.log.info("Loaded event steps", steps=self.event_steps)
182
183 def start(self):
184 eventbus_kind = Config.get("event_bus.kind")
185 eventbus_endpoint = Config.get("event_bus.endpoint")
186
187 if not eventbus_kind:
188 self.log.error(
189 "Eventbus kind is not configured in synchronizer config file."
190 )
191 return
192
193 if eventbus_kind not in ["kafka"]:
194 self.log.error(
195 "Eventbus kind is set to a technology we do not implement.",
196 eventbus_kind=eventbus_kind,
197 )
198 return
199
200 if not eventbus_endpoint:
201 self.log.error(
202 "Eventbus endpoint is not configured in synchronizer config file."
203 )
204 return
205
206 for step in self.event_steps:
207 if step.technology == "kafka":
208 thread = XOSKafkaThread(step, [eventbus_endpoint], self.log)
209 thread.start()
210 self.threads.append(thread)
211 else:
212 self.log.error(
213 "Unknown technology. Skipping step",
214 technology=step.technology,
215 step=step.__name__,
216 )