blob: 694a1a85330317ac03c08d5fda41e109d19cb649 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import confluent_kafka
16import imp
17import inspect
18import os
19import threading
20import time
21from xosconfig import Config
22
23
24class XOSKafkaMessage:
25 def __init__(self, consumer_msg):
26
27 self.topic = consumer_msg.topic()
28 self.key = consumer_msg.key()
29 self.value = consumer_msg.value()
30
31 self.timestamp = None
32 (ts_type, ts_val) = consumer_msg.timestamp()
33
34 if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
35 self.timestamp = ts_val
36
37
38class XOSKafkaThread(threading.Thread):
39 """ XOSKafkaThread
40
41 A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
42 Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
43 function is called for each event.
44 """
45
Scott Bakerc2fddaa2019-01-30 15:45:03 -080046 def __init__(self, step, bootstrap_servers, model_accessor, log, *args, **kwargs):
Scott Bakerbba67b62019-01-28 17:38:21 -080047 super(XOSKafkaThread, self).__init__(*args, **kwargs)
48 self.consumer = None
49 self.step = step
50 self.bootstrap_servers = bootstrap_servers
Scott Bakerc2fddaa2019-01-30 15:45:03 -080051 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080052 self.log = log
53 self.daemon = True
54
55 def create_kafka_consumer(self):
56 # use the service name as the group id
57 consumer_config = {
58 "group.id": Config().get("name"),
59 "bootstrap.servers": ",".join(self.bootstrap_servers),
60 "default.topic.config": {"auto.offset.reset": "smallest"},
61 }
62
63 return confluent_kafka.Consumer(**consumer_config)
64
65 def run(self):
66 if (not self.step.topics) and (not self.step.pattern):
67 raise Exception(
68 "Neither topics nor pattern is defined for step %s" % self.step.__name__
69 )
70
71 if self.step.topics and self.step.pattern:
72 raise Exception(
73 "Both topics and pattern are defined for step %s. Choose one."
74 % self.step.__name__
75 )
76
77 self.log.info(
78 "Waiting for events",
79 topic=self.step.topics,
80 pattern=self.step.pattern,
81 step=self.step.__name__,
82 )
83
84 while True:
85 try:
86 # setup consumer or loop on failure
87 if self.consumer is None:
88 self.consumer = self.create_kafka_consumer()
89
90 if self.step.topics:
91 self.consumer.subscribe(self.step.topics)
92
93 elif self.step.pattern:
94 self.consumer.subscribe(self.step.pattern)
95
96 except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
97 self.log.warning(
98 "No brokers available on %s, %s" % (self.bootstrap_servers, e)
99 )
100 time.sleep(20)
101 continue
102
103 except confluent_kafka.KafkaError as e:
104 # Maybe Kafka has not started yet. Log the exception and try again in a second.
105 self.log.exception("Exception in kafka loop: %s" % e)
106 time.sleep(1)
107 continue
108
109 # wait until we get a message, if no message, loop again
110 msg = self.consumer.poll(timeout=1.0)
111
112 if msg is None:
113 continue
114
115 if msg.error():
116 if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
117 self.log.debug(
118 "Reached end of kafka topic %s, partition: %s, offset: %d"
119 % (msg.topic(), msg.partition(), msg.offset())
120 )
121 else:
122 self.log.exception("Error in kafka message: %s" % msg.error())
123
124 else:
125 # wrap parsing the event in a class
126 event_msg = XOSKafkaMessage(msg)
127
128 self.log.info(
129 "Processing event", event_msg=event_msg, step=self.step.__name__
130 )
131
132 try:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800133 self.step(model_accessor=self.model_accessor, log=self.log).process_event(event_msg)
Scott Bakerbba67b62019-01-28 17:38:21 -0800134
135 except BaseException:
136 self.log.exception(
137 "Exception in event step",
138 event_msg=event_msg,
139 step=self.step.__name__,
140 )
141
142
143class XOSEventEngine(object):
144 """ XOSEventEngine
145
146 Subscribe to and handle processing of events. Two methods are defined:
147
148 load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
149 descendant from EventStep.
150
151 start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
152 will be called before start().
153 """
154
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800155 def __init__(self, model_accessor, log):
Scott Bakerbba67b62019-01-28 17:38:21 -0800156 self.event_steps = []
157 self.threads = []
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800158 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -0800159 self.log = log
160
161 def load_event_step_modules(self, event_step_dir):
162 self.event_steps = []
163 self.log.info("Loading event steps", event_step_dir=event_step_dir)
164
165 # NOTE we'll load all the classes that inherit from EventStep
166 for fn in os.listdir(event_step_dir):
167 pathname = os.path.join(event_step_dir, fn)
168 if (
169 os.path.isfile(pathname)
170 and fn.endswith(".py")
171 and (fn != "__init__.py")
172 and ("test" not in fn)
173 ):
174 event_module = imp.load_source(fn[:-3], pathname)
175
176 for classname in dir(event_module):
177 c = getattr(event_module, classname, None)
178
179 if inspect.isclass(c):
180 base_names = [b.__name__ for b in c.__bases__]
181 if "EventStep" in base_names:
182 self.event_steps.append(c)
183 self.log.info("Loaded event steps", steps=self.event_steps)
184
185 def start(self):
186 eventbus_kind = Config.get("event_bus.kind")
187 eventbus_endpoint = Config.get("event_bus.endpoint")
188
189 if not eventbus_kind:
190 self.log.error(
191 "Eventbus kind is not configured in synchronizer config file."
192 )
193 return
194
195 if eventbus_kind not in ["kafka"]:
196 self.log.error(
197 "Eventbus kind is set to a technology we do not implement.",
198 eventbus_kind=eventbus_kind,
199 )
200 return
201
202 if not eventbus_endpoint:
203 self.log.error(
204 "Eventbus endpoint is not configured in synchronizer config file."
205 )
206 return
207
208 for step in self.event_steps:
209 if step.technology == "kafka":
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800210 thread = XOSKafkaThread(step, [eventbus_endpoint], self.model_accessor, self.log)
Scott Bakerbba67b62019-01-28 17:38:21 -0800211 thread.start()
212 self.threads.append(thread)
213 else:
214 self.log.error(
215 "Unknown technology. Skipping step",
216 technology=step.technology,
217 step=step.__name__,
218 )