blob: 2c61239113180b6e7a806c61007b9f61aaa3b198 [file] [log] [blame]
Zack Williams73a12852018-09-05 15:33:35 -07001# Copyright 2018-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15""" XOSKafkaProducer """
16
17import confluent_kafka
18
19from xosconfig import Config
20from multistructlog import create_logger
Zack Williams045b63d2019-01-22 16:30:57 -070021
Scott Baker93c40b42019-03-21 08:57:26 -070022log = None
Zack Williams73a12852018-09-05 15:33:35 -070023kafka_producer = None
24
25
26class XOSKafkaProducer:
27 """ XOSKafkaProducer
28 Wrapper to share Kafka Producer connection
29 """
30
31 @staticmethod
32 def init():
33
Scott Baker93c40b42019-03-21 08:57:26 -070034 global log
Zack Williams73a12852018-09-05 15:33:35 -070035 global kafka_producer
36
Scott Baker93c40b42019-03-21 08:57:26 -070037 if not log:
38 log = create_logger(Config().get("logging"))
39
Zack Williams73a12852018-09-05 15:33:35 -070040 if kafka_producer:
Zack Williams045b63d2019-01-22 16:30:57 -070041 raise Exception("XOSKafkaProducer already initialized")
Zack Williams73a12852018-09-05 15:33:35 -070042
43 else:
Zack Williams045b63d2019-01-22 16:30:57 -070044 log.info(
45 "Connecting to Kafka with bootstrap servers: %s"
46 % Config.get("kafka_bootstrap_servers")
47 )
Zack Williams73a12852018-09-05 15:33:35 -070048
49 try:
50 producer_config = {
Zack Williams045b63d2019-01-22 16:30:57 -070051 "bootstrap.servers": ",".join(Config.get("kafka_bootstrap_servers"))
Zack Williams73a12852018-09-05 15:33:35 -070052 }
53
54 kafka_producer = confluent_kafka.Producer(**producer_config)
55
Zack Williams045b63d2019-01-22 16:30:57 -070056 log.info("Connected to Kafka: %s" % kafka_producer)
Zack Williams73a12852018-09-05 15:33:35 -070057
Zack Williams045b63d2019-01-22 16:30:57 -070058 except confluent_kafka.KafkaError as e:
Zack Williams73a12852018-09-05 15:33:35 -070059 log.exception("Kafka Error: %s" % e)
60
61 @classmethod
62 def produce(cls, topic, key, value):
63
64 try:
65 kafka_producer.produce(
Zack Williams045b63d2019-01-22 16:30:57 -070066 topic, value, key, callback=cls._kafka_delivery_callback
67 )
Zack Williams73a12852018-09-05 15:33:35 -070068
Zack Williamsc2b87bf2018-09-21 13:48:40 -070069 # see https://github.com/confluentinc/confluent-kafka-python/issues/16
70 kafka_producer.poll(0)
71
Zack Williams045b63d2019-01-22 16:30:57 -070072 except confluent_kafka.KafkaError as err:
Zack Williams73a12852018-09-05 15:33:35 -070073 log.exception("Kafka Error", err)
74
Zack Williamsc2b87bf2018-09-21 13:48:40 -070075 def __del__(self):
Zack Williams045b63d2019-01-22 16:30:57 -070076 if kafka_producer is not None:
Zack Williamsc2b87bf2018-09-21 13:48:40 -070077 kafka_producer.flush()
78
Zack Williams73a12852018-09-05 15:33:35 -070079 @staticmethod
80 def _kafka_delivery_callback(err, msg):
81 if err:
Zack Williams045b63d2019-01-22 16:30:57 -070082 log.error("Message failed delivery: %s" % err)
Zack Williams73a12852018-09-05 15:33:35 -070083 else:
Zack Williamsda69db22019-01-29 16:44:52 -070084 pass