blob: b4134d588572895f84856316a90f4f08747e6deb [file] [log] [blame]
Zack Williams73a12852018-09-05 15:33:35 -07001# Copyright 2018-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15""" XOSKafkaProducer """
16
17import confluent_kafka
18
19from xosconfig import Config
20from multistructlog import create_logger
21log = create_logger(Config().get('logging'))
22
23kafka_producer = None
24
25
26class XOSKafkaProducer:
27 """ XOSKafkaProducer
28 Wrapper to share Kafka Producer connection
29 """
30
31 @staticmethod
32 def init():
33
34 global kafka_producer
35
36 if kafka_producer:
37 raise Exception('XOSKafkaProducer already initialized')
38
39 else:
40 log.info('Connecting to Kafka with bootstrap servers: %s' %
41 Config.get('kafka_bootstrap_servers'))
42
43 try:
44 producer_config = {
45 'bootstrap.servers':
46 ','.join(Config.get('kafka_bootstrap_servers')),
47 }
48
49 kafka_producer = confluent_kafka.Producer(**producer_config)
50
51 log.info('Connected to Kafka: %s' % kafka_producer)
52
53 except confluent_kafka.KafkaError, e:
54 log.exception("Kafka Error: %s" % e)
55
56 @classmethod
57 def produce(cls, topic, key, value):
58
59 try:
60 kafka_producer.produce(
61 topic,
62 value,
63 key,
64 callback=cls._kafka_delivery_callback
65 )
66
Zack Williamsc2b87bf2018-09-21 13:48:40 -070067 # see https://github.com/confluentinc/confluent-kafka-python/issues/16
68 kafka_producer.poll(0)
69
Zack Williams73a12852018-09-05 15:33:35 -070070 except confluent_kafka.KafkaError, err:
71 log.exception("Kafka Error", err)
72
Zack Williamsc2b87bf2018-09-21 13:48:40 -070073 def __del__(self):
74 if kafka_producer is not None:
75 kafka_producer.flush()
76
Zack Williams73a12852018-09-05 15:33:35 -070077 @staticmethod
78 def _kafka_delivery_callback(err, msg):
79 if err:
80 log.error('Message failed delivery: %s' % err)
81 else:
82 log.trace('Message delivered', message=msg)