blob: 89b33851ad6bfe58206ea453d7e432a4a27337cb [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#!/usr/bin/env python
2#
3# Copyright 2017 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""
19A gateway between the internal event bus and the Kafka publisher proxy
20to publish select topics and messages posted to the Voltha-internal event
21bus toward the external world.
22"""
23import structlog
24from google.protobuf.json_format import MessageToDict
25from google.protobuf.message import Message
26from simplejson import dumps
27
28from python.common.event_bus import EventBusClient
29
30log = structlog.get_logger()
31
32
33class EventBusPublisher(object):
34
35 def __init__(self, kafka_proxy, config):
36 self.kafka_proxy = kafka_proxy
37 self.config = config
38 self.topic_mappings = config.get('topic_mappings', {})
39 self.event_bus = EventBusClient()
40 self.subscriptions = None
41
42 def start(self):
43 log.debug('starting')
44 self.subscriptions = list()
45 self._setup_subscriptions(self.topic_mappings)
46 log.info('started')
47 return self
48
49 def stop(self):
50 try:
51 log.debug('stopping-event-bus')
52 if self.subscriptions:
53 for subscription in self.subscriptions:
54 self.event_bus.unsubscribe(subscription)
55 log.info('stopped-event-bus')
56 except Exception, e:
57 log.exception('failed-stopping-event-bus', e=e)
58 return
59
60 def _setup_subscriptions(self, mappings):
61
62 for event_bus_topic, mapping in mappings.iteritems():
63
64 kafka_topic = mapping.get('kafka_topic', None)
65
66 if kafka_topic is None:
67 log.error('no-kafka-topic-in-config',
68 event_bus_topic=event_bus_topic,
69 mapping=mapping)
70 continue
71
72 self.subscriptions.append(self.event_bus.subscribe(
73 event_bus_topic,
74 # to avoid Python late-binding to the last registered
75 # kafka_topic, we force instant binding with the default arg
76 lambda _, m, k=kafka_topic: self.forward(k, m)))
77
78 log.info('event-to-kafka', kafka_topic=kafka_topic,
79 event_bus_topic=event_bus_topic)
80
81 def forward(self, kafka_topic, msg):
82 try:
83 # convert to JSON string if msg is a protobuf msg
84 if isinstance(msg, Message):
85 msg = dumps(MessageToDict(msg, True, True))
86 log.debug('forward-event-bus-publisher')
87 self.kafka_proxy.send_message(kafka_topic, msg)
88 except Exception, e:
89 log.exception('failed-forward-event-bus-publisher', e=e)
90