blob: 07508b9ab1f52b69ecde0f95dd6314b92629a802 [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#!/usr/bin/env python
2
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
Zack Williams84a71e92019-11-15 09:00:19 -070017from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060018import time
19from uuid import uuid4
20
21import structlog
22from twisted.internet import reactor
23from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
24 DeferredQueue, gatherResults
25from zope.interface import implementer
26
27from pyvoltha.common.utils import asleep
28from pyvoltha.common.utils.registry import IComponent
Zack Williams84a71e92019-11-15 09:00:19 -070029from .kafka_proxy import KafkaProxy, get_kafka_proxy
William Kurkianede82e92019-03-05 13:02:57 -050030from voltha_protos.inter_container_pb2 import MessageType, Argument, \
Chip Boling67b674a2019-02-08 11:42:18 -060031 InterContainerRequestBody, InterContainerMessage, Header, \
khenaidoo944aee72019-02-28 11:00:24 -050032 InterContainerResponseBody, StrType
Zack Williams84a71e92019-11-15 09:00:19 -070033import six
34import codecs
Chip Boling67b674a2019-02-08 11:42:18 -060035
36log = structlog.get_logger()
37
38KAFKA_OFFSET_LATEST = 'latest'
39KAFKA_OFFSET_EARLIEST = 'earliest'
khenaidoo944aee72019-02-28 11:00:24 -050040ARG_FROM_TOPIC = 'fromTopic'
Chip Boling67b674a2019-02-08 11:42:18 -060041
42
Matt Jeanneret15ee2cc2019-02-28 11:33:18 -050043class KafkaMessagingError(Exception):
Chip Boling67b674a2019-02-08 11:42:18 -060044 def __init__(self, error):
45 self.error = error
46
47
48@implementer(IComponent)
49class IKafkaMessagingProxy(object):
50 _kafka_messaging_instance = None
51
52 def __init__(self,
53 kafka_host_port,
54 kv_store,
55 default_topic,
56 group_id_prefix,
57 target_cls):
58 """
59 Initialize the kafka proxy. This is a singleton (may change to
60 non-singleton if performance is better)
61 :param kafka_host_port: Kafka host and port
62 :param kv_store: Key-Value store
63 :param default_topic: Default topic to subscribe to
64 :param target_cls: target class - method of that class is invoked
65 when a message is received on the default_topic
66 """
67 # return an exception if the object already exist
68 if IKafkaMessagingProxy._kafka_messaging_instance:
69 raise Exception(
70 'Singleton-exist', cls=IKafkaMessagingProxy)
71
72 log.debug("Initializing-KafkaProxy")
73 self.kafka_host_port = kafka_host_port
74 self.kv_store = kv_store
75 self.default_topic = default_topic
76 self.default_group_id = "_".join((group_id_prefix, default_topic))
77 self.target_cls = target_cls
78 self.topic_target_cls_map = {}
79 self.topic_callback_map = {}
80 self.subscribers = {}
81 self.kafka_proxy = None
82 self.transaction_id_deferred_map = {}
83 self.received_msg_queue = DeferredQueue()
84 self.stopped = False
85
86 self.init_time = 0
87 self.init_received_time = 0
88
89 self.init_resp_time = 0
90 self.init_received_resp_time = 0
91
92 self.num_messages = 0
93 self.total_time = 0
94 self.num_responses = 0
95 self.total_time_responses = 0
96 log.debug("KafkaProxy-initialized")
97
98 def start(self):
99 try:
100 log.debug("KafkaProxy-starting")
101
102 # Get the kafka proxy instance. If it does not exist then
103 # create it
104 self.kafka_proxy = get_kafka_proxy()
105 if self.kafka_proxy == None:
106 KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
107 self.kafka_proxy = get_kafka_proxy()
108
109 # Subscribe the default topic and target_cls
110 self.topic_target_cls_map[self.default_topic] = self.target_cls
111
112 # Start the queue to handle incoming messages
113 reactor.callLater(0, self._received_message_processing_loop)
114
115 # Subscribe using the default topic and default group id. Whenever
116 # a message is received on that topic then teh target_cls will be
117 # invoked.
118 reactor.callLater(0, self.subscribe,
119 topic=self.default_topic,
120 target_cls=self.target_cls,
121 group_id=self.default_group_id)
122
123 # Setup the singleton instance
124 IKafkaMessagingProxy._kafka_messaging_instance = self
125 log.debug("KafkaProxy-started")
126 except Exception as e:
127 log.exception("Failed-to-start-proxy", e=e)
128
129 def stop(self):
130 """
131 Invoked to stop the kafka proxy
132 :return: None on success, Exception on failure
133 """
134 log.debug("Stopping-messaging-proxy ...")
135 try:
136 # Stop the kafka proxy. This will stop all the consumers
137 # and producers
138 self.stopped = True
139 self.kafka_proxy.stop()
140 log.debug("Messaging-proxy-stopped.")
141 except Exception as e:
142 log.exception("Exception-when-stopping-messaging-proxy:", e=e)
143
144 def get_target_cls(self):
145 return self.target_cls
146
147 def get_default_topic(self):
148 return self.default_topic
149
150 @inlineCallbacks
151 def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
152 target_cls=None):
153 try:
154 log.debug("subscribing-to-topic-start", topic=topic)
155 yield self.kafka_proxy.subscribe(topic,
156 self._enqueue_received_group_message,
157 group_id, offset)
158
159 if target_cls is not None and callback is None:
160 # Scenario #1
161 if topic not in self.topic_target_cls_map:
162 self.topic_target_cls_map[topic] = target_cls
163 elif target_cls is None and callback is not None:
164 # Scenario #2
165 log.debug("custom-callback", topic=topic,
166 callback_map=self.topic_callback_map)
167 if topic not in self.topic_callback_map:
168 self.topic_callback_map[topic] = [callback]
169 else:
170 self.topic_callback_map[topic].extend([callback])
171 else:
172 log.warn("invalid-parameters")
173
174 returnValue(True)
175 except Exception as e:
176 log.exception("Exception-during-subscription", e=e)
177 returnValue(False)
178
179 @inlineCallbacks
180 def subscribe(self, topic, callback=None, target_cls=None,
181 max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
182 """
183 Scenario 1: invoked to subscribe to a specific topic with a
184 target_cls to invoke when a message is received on that topic. This
185 handles the case of request/response where this library performs the
186 heavy lifting. In this case the m_callback must to be None
187
188 Scenario 2: invoked to subscribe to a specific topic with a
189 specific callback to invoke when a message is received on that topic.
190 This handles the case where the caller wants to process the message
191 received itself. In this case the target_cls must to be None
192
193 :param topic: topic to subscribe to
194 :param callback: Callback to invoke when a message is received on
195 the topic. Either one of callback or target_cls needs can be none
196 :param target_cls: Target class to use when a message is
197 received on the topic. There can only be 1 target_cls per topic.
198 Either one of callback or target_cls needs can be none
199 :param max_retry: the number of retries before reporting failure
200 to subscribe. This caters for scenario where the kafka topic is not
201 ready.
202 :param group_id: The ID of the group the consumer is subscribing to
203 :param offset: The topic offset on the kafka bus from where message consumption will start
204 :return: True on success, False on failure
205 """
206 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
207
208 def _backoff(msg, retries):
209 wait_time = RETRY_BACKOFF[min(retries,
210 len(RETRY_BACKOFF) - 1)]
211 log.info(msg, retry_in=wait_time)
212 return asleep.asleep(wait_time)
213
214 log.debug("subscribing", topic=topic, group_id=group_id,
215 callback=callback, target=target_cls)
216
217 retry = 0
218 subscribed = False
219 if group_id is None:
220 group_id = self.default_group_id
221 while not subscribed:
222 subscribed = yield self._subscribe_group_consumer(group_id, topic,
223 callback=callback,
224 target_cls=target_cls,
225 offset=offset)
226 if subscribed:
227 returnValue(True)
228 elif retry > max_retry:
229 returnValue(False)
230 else:
231 _backoff("subscription-not-complete", retry)
232 retry += 1
233
234 def unsubscribe(self, topic, callback=None, target_cls=None):
235 """
236 Invoked when unsubscribing to a topic
237 :param topic: topic to unsubscribe from
238 :param callback: the callback used when subscribing to the topic, if any
239 :param target_cls: the targert class used when subscribing to the topic, if any
240 :return: None on success or Exception on failure
241 """
242 log.debug("Unsubscribing-to-topic", topic=topic)
243
244 try:
245 self.kafka_proxy.unsubscribe(topic,
246 self._enqueue_received_group_message)
247
248 if callback is None and target_cls is None:
249 log.error("both-call-and-target-cls-cannot-be-none",
250 topic=topic)
251 raise KafkaMessagingError(
252 error="both-call-and-target-cls-cannot-be-none")
253
254 if target_cls is not None and topic in self.topic_target_cls_map:
255 del self.topic_target_cls_map[topic]
256
257 if callback is not None and topic in self.topic_callback_map:
258 index = 0
259 for cb in self.topic_callback_map[topic]:
260 if cb == callback:
261 break
262 index += 1
263 if index < len(self.topic_callback_map[topic]):
264 self.topic_callback_map[topic].pop(index)
265
266 if len(self.topic_callback_map[topic]) == 0:
267 del self.topic_callback_map[topic]
268 except Exception as e:
269 log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
270 e=e)
271 return e
272
273 @inlineCallbacks
274 def _enqueue_received_group_message(self, msg):
275 """
276 Internal method to continuously queue all received messaged
277 irrespective of topic
278 :param msg: Received message
279 :return: None on success, Exception on failure
280 """
281 try:
282 log.debug("received-msg", msg=msg)
283 yield self.received_msg_queue.put(msg)
284 except Exception as e:
285 log.exception("Failed-enqueueing-received-message", e=e)
286
287 @inlineCallbacks
288 def _received_message_processing_loop(self):
289 """
290 Internal method to continuously process all received messages one
291 at a time
292 :return: None on success, Exception on failure
293 """
294 while True:
295 try:
296 message = yield self.received_msg_queue.get()
amit.ghoshe68244c2020-11-16 09:14:01 +0100297 reactor.callLater(0, self._process_message, message)
Chip Boling67b674a2019-02-08 11:42:18 -0600298 if self.stopped:
299 break
300 except Exception as e:
301 log.exception("Failed-dequeueing-received-message", e=e)
302
303 def _to_string(self, unicode_str):
304 if unicode_str is not None:
Zack Williams84a71e92019-11-15 09:00:19 -0700305 if isinstance(unicode_str, six.string_types):
Chip Boling67b674a2019-02-08 11:42:18 -0600306 return unicode_str
Zack Williams84a71e92019-11-15 09:00:19 -0700307 else:
308 return codecs.encode(unicode_str, 'ascii')
Chip Boling67b674a2019-02-08 11:42:18 -0600309 else:
310 return None
311
312 def _format_request(self,
313 rpc,
314 to_topic,
315 reply_topic,
316 **kwargs):
317 """
318 Format a request to send over kafka
319 :param rpc: Requested remote API
320 :param to_topic: Topic to send the request
321 :param reply_topic: Topic to receive the resulting response, if any
322 :param kwargs: Dictionary of key-value pairs to pass as arguments to
323 the remote rpc API.
324 :return: A InterContainerMessage message type on success or None on
325 failure
326 """
327 try:
328 transaction_id = uuid4().hex
329 request = InterContainerMessage()
330 request_body = InterContainerRequestBody()
331 request.header.id = transaction_id
332 request.header.type = MessageType.Value("REQUEST")
333 request.header.from_topic = reply_topic
334 request.header.to_topic = to_topic
335
336 response_required = False
337 if reply_topic:
338 request_body.reply_to_topic = reply_topic
339 request_body.response_required = True
340 response_required = True
341
Scott Baker8f144242020-04-17 13:13:05 -0700342 request.header.timestamp.GetCurrentTime()
Chip Boling67b674a2019-02-08 11:42:18 -0600343 request_body.rpc = rpc
Zack Williams84a71e92019-11-15 09:00:19 -0700344 for a, b in six.iteritems(kwargs):
Chip Boling67b674a2019-02-08 11:42:18 -0600345 arg = Argument()
346 arg.key = a
347 try:
348 arg.value.Pack(b)
349 request_body.args.extend([arg])
350 except Exception as e:
351 log.exception("Failed-parsing-value", e=e)
352 request.body.Pack(request_body)
353 return request, transaction_id, response_required
354 except Exception as e:
355 log.exception("formatting-request-failed",
356 rpc=rpc,
357 to_topic=to_topic,
358 reply_topic=reply_topic,
359 args=kwargs)
360 return None, None, None
361
362 def _format_response(self, msg_header, msg_body, status):
363 """
364 Format a response
365 :param msg_header: The header portion of a received request
366 :param msg_body: The response body
367 :param status: True is this represents a successful response
368 :return: a InterContainerMessage message type
369 """
370 try:
371 assert isinstance(msg_header, Header)
372 response = InterContainerMessage()
373 response_body = InterContainerResponseBody()
374 response.header.id = msg_header.id
Scott Baker8f144242020-04-17 13:13:05 -0700375 response.header.timestamp.GetCurrentTime()
Chip Boling67b674a2019-02-08 11:42:18 -0600376 response.header.type = MessageType.Value("RESPONSE")
377 response.header.from_topic = msg_header.to_topic
378 response.header.to_topic = msg_header.from_topic
379 if msg_body is not None:
380 response_body.result.Pack(msg_body)
381 response_body.success = status
382 response.body.Pack(response_body)
383 return response
384 except Exception as e:
385 log.exception("formatting-response-failed", header=msg_header,
386 body=msg_body, status=status, e=e)
387 return None
388
389 def _parse_response(self, msg):
390 try:
391 message = InterContainerMessage()
392 message.ParseFromString(msg)
393 resp = InterContainerResponseBody()
394 if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
395 message.body.Unpack(resp)
396 else:
397 log.debug("unsupported-msg", msg_type=type(message.body))
398 return None
Matteo Scandolo63efb062019-11-26 12:14:48 -0700399 log.debug("parsed-response", type=message.header.type, from_topic=message.header.from_topic,
Matteo Scandoloa38b7882020-03-23 15:40:34 -0700400 to_topic=message.header.to_topic, transaction_id=message.header.id)
Chip Boling67b674a2019-02-08 11:42:18 -0600401 return resp
402 except Exception as e:
403 log.exception("parsing-response-failed", msg=msg, e=e)
404 return None
405
406 @inlineCallbacks
407 def _process_message(self, m):
408 """
409 Default internal method invoked for every batch of messages received
410 from Kafka.
411 """
412
khenaidoo944aee72019-02-28 11:00:24 -0500413 def _augment_args_with_FromTopic(args, from_topic):
414 arg = Argument(key=ARG_FROM_TOPIC)
415 t = StrType(val=from_topic)
416 arg.value.Pack(t)
417 args.extend([arg])
418 return args
419
Chip Boling67b674a2019-02-08 11:42:18 -0600420 def _toDict(args):
421 """
422 Convert a repeatable Argument type into a python dictionary
423 :param args: Repeatable core_adapter.Argument type
424 :return: a python dictionary
425 """
426 if args is None:
427 return None
428 result = {}
429 for arg in args:
430 assert isinstance(arg, Argument)
431 result[arg.key] = arg.value
432 return result
433
Zack Williams84a71e92019-11-15 09:00:19 -0700434 current_time = int(time.time() * 1000)
Chip Boling67b674a2019-02-08 11:42:18 -0600435 # log.debug("Got Message", message=m)
436 try:
437 val = m.value()
438 # val = m.message.value
439 # print m.topic
440
441 # Go over customized callbacks first
442 m_topic = m.topic()
443 if m_topic in self.topic_callback_map:
444 for c in self.topic_callback_map[m_topic]:
445 yield c(val)
446
447 # Check whether we need to process request/response scenario
448 if m_topic not in self.topic_target_cls_map:
449 return
450
451 # Process request/response scenario
452 message = InterContainerMessage()
453 message.ParseFromString(val)
454
455 if message.header.type == MessageType.Value("REQUEST"):
456 # Get the target class for that specific topic
457 targetted_topic = self._to_string(message.header.to_topic)
458 msg_body = InterContainerRequestBody()
459 if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
460 message.body.Unpack(msg_body)
461 else:
462 log.debug("unsupported-msg", msg_type=type(message.body))
463 return
464 if targetted_topic in self.topic_target_cls_map:
khenaidoo944aee72019-02-28 11:00:24 -0500465 # Augment the request arguments with the from_topic
466 augmented_args = _augment_args_with_FromTopic(msg_body.args,
467 msg_body.reply_to_topic)
468 if augmented_args:
Matteo Scandolo63efb062019-11-26 12:14:48 -0700469 log.debug("message-body-args-present", rpc=msg_body.rpc,
470 response_required=msg_body.response_required, reply_to_topic=msg_body.reply_to_topic)
Chip Boling67b674a2019-02-08 11:42:18 -0600471 (status, res) = yield getattr(
472 self.topic_target_cls_map[targetted_topic],
473 self._to_string(msg_body.rpc))(
khenaidoo944aee72019-02-28 11:00:24 -0500474 **_toDict(augmented_args))
Chip Boling67b674a2019-02-08 11:42:18 -0600475 else:
Matteo Scandolo63efb062019-11-26 12:14:48 -0700476 log.debug("message-body-args-absent", rpc=msg_body.rpc,
477 response_required=msg_body.response_required, reply_to_topic=msg_body.reply_to_topic,)
Chip Boling67b674a2019-02-08 11:42:18 -0600478 (status, res) = yield getattr(
479 self.topic_target_cls_map[targetted_topic],
480 self._to_string(msg_body.rpc))()
481 if msg_body.response_required:
482 response = self._format_response(
483 msg_header=message.header,
484 msg_body=res,
485 status=status,
486 )
487 if response is not None:
488 res_topic = self._to_string(
489 response.header.to_topic)
490 self._send_kafka_message(res_topic, response)
491
Matteo Scandolo63efb062019-11-26 12:14:48 -0700492 log.debug("Response-sent",
Chip Boling67b674a2019-02-08 11:42:18 -0600493 to_topic=res_topic)
494 elif message.header.type == MessageType.Value("RESPONSE"):
495 trns_id = self._to_string(message.header.id)
Matteo Scandoloa38b7882020-03-23 15:40:34 -0700496 log.debug('received-response', transaction_id=trns_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600497 if trns_id in self.transaction_id_deferred_map:
498 resp = self._parse_response(val)
499
500 self.transaction_id_deferred_map[trns_id].callback(resp)
501 else:
502 log.error("!!INVALID-TRANSACTION-TYPE!!")
503
504 except Exception as e:
505 log.exception("Failed-to-process-message", message=m, e=e)
506
507 @inlineCallbacks
508 def _send_kafka_message(self, topic, msg):
509 try:
510 yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
Zack Williams84a71e92019-11-15 09:00:19 -0700511 except Exception as e:
Chip Boling67b674a2019-02-08 11:42:18 -0600512 log.exception("Failed-sending-message", message=msg, e=e)
513
514 @inlineCallbacks
515 def send_request(self,
516 rpc,
517 to_topic,
518 reply_topic=None,
519 callback=None,
520 **kwargs):
521 """
522 Invoked to send a message to a remote container and receive a
523 response if required.
524 :param rpc: The remote API to invoke
525 :param to_topic: Send the message to this kafka topic
526 :param reply_topic: If not None then a response is expected on this
527 topic. If set to None then no response is required.
528 :param callback: Callback to invoke when a response is received.
529 :param kwargs: Key-value pairs representing arguments to pass to the
530 rpc remote API.
531 :return: Either no response is required, or a response is returned
532 via the callback or the response is a tuple of (status, return_cls)
533 """
534 try:
535 # Ensure all strings are not unicode encoded
536 rpc = self._to_string(rpc)
537 to_topic = self._to_string(to_topic)
538 reply_topic = self._to_string(reply_topic)
539
540 request, transaction_id, response_required = \
541 self._format_request(
542 rpc=rpc,
543 to_topic=to_topic,
544 reply_topic=reply_topic,
545 **kwargs)
546
547 if request is None:
548 return
549
550 # Add the transaction to the transaction map before sending the
551 # request. This will guarantee the eventual response will be
552 # processed.
553 wait_for_result = None
554 if response_required:
555 wait_for_result = Deferred()
556 self.transaction_id_deferred_map[
557 self._to_string(request.header.id)] = wait_for_result
William Kurkian79d1fee2019-04-11 11:26:10 -0400558 log.debug("message-send", transaction_id=transaction_id, to_topic=to_topic,
Matteo Scandoloe3c84462020-03-30 15:26:00 -0700559 from_topic=reply_topic, rpc=rpc)
Chip Boling67b674a2019-02-08 11:42:18 -0600560 yield self._send_kafka_message(to_topic, request)
William Kurkian79d1fee2019-04-11 11:26:10 -0400561 log.debug("message-sent", transaction_id=transaction_id, to_topic=to_topic,
Matteo Scandoloe3c84462020-03-30 15:26:00 -0700562 from_topic=reply_topic, rpc=rpc)
Chip Boling67b674a2019-02-08 11:42:18 -0600563
564 if response_required:
565 res = yield wait_for_result
566
Chip Boling67b674a2019-02-08 11:42:18 -0600567 # Remove the transaction from the transaction map
568 del self.transaction_id_deferred_map[transaction_id]
569
Matt Jeanneret15ee2cc2019-02-28 11:33:18 -0500570 if res is not None:
571 if res.success:
Matteo Scandoloe3c84462020-03-30 15:26:00 -0700572 log.debug("send-message-response", transaction_id=transaction_id, rpc=rpc)
Matt Jeanneret15ee2cc2019-02-28 11:33:18 -0500573 if callback:
574 callback((res.success, res.result))
575 else:
576 returnValue((res.success, res.result))
577 else:
578 # this is the case where the core API returns a grpc code.NotFound. Return or callback
579 # so the caller can act appropriately (i.e add whatever was not found)
Matteo Scandoloe3c84462020-03-30 15:26:00 -0700580 log.warn("send-message-response-error-result", transaction_id=transaction_id, rpc=rpc, kafka_request=request, kafka_result=res)
Matt Jeanneret15ee2cc2019-02-28 11:33:18 -0500581 if callback:
582 callback((res.success, None))
583 else:
584 returnValue((res.success, None))
Chip Boling67b674a2019-02-08 11:42:18 -0600585 else:
Matt Jeanneret15ee2cc2019-02-28 11:33:18 -0500586 raise KafkaMessagingError(error="failed-response-for-request:{}".format(request))
587
Chip Boling67b674a2019-02-08 11:42:18 -0600588 except Exception as e:
589 log.exception("Exception-sending-request", e=e)
590 raise KafkaMessagingError(error=e)
591
592
593# Common method to get the singleton instance of the kafka proxy class
594def get_messaging_proxy():
595 return IKafkaMessagingProxy._kafka_messaging_instance