blob: 241d87f24c3df8bc1e2407e11f38b0e980d4d87d [file] [log] [blame]
Zack Williams73a12852018-09-05 15:33:35 -07001# Copyright 2018-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15""" XOSKafkaProducer """
16
17import confluent_kafka
18
19from xosconfig import Config
20from multistructlog import create_logger
Zack Williams045b63d2019-01-22 16:30:57 -070021
22log = create_logger(Config().get("logging"))
Zack Williams73a12852018-09-05 15:33:35 -070023
24kafka_producer = None
25
26
27class XOSKafkaProducer:
28 """ XOSKafkaProducer
29 Wrapper to share Kafka Producer connection
30 """
31
32 @staticmethod
33 def init():
34
35 global kafka_producer
36
37 if kafka_producer:
Zack Williams045b63d2019-01-22 16:30:57 -070038 raise Exception("XOSKafkaProducer already initialized")
Zack Williams73a12852018-09-05 15:33:35 -070039
40 else:
Zack Williams045b63d2019-01-22 16:30:57 -070041 log.info(
42 "Connecting to Kafka with bootstrap servers: %s"
43 % Config.get("kafka_bootstrap_servers")
44 )
Zack Williams73a12852018-09-05 15:33:35 -070045
46 try:
47 producer_config = {
Zack Williams045b63d2019-01-22 16:30:57 -070048 "bootstrap.servers": ",".join(Config.get("kafka_bootstrap_servers"))
Zack Williams73a12852018-09-05 15:33:35 -070049 }
50
51 kafka_producer = confluent_kafka.Producer(**producer_config)
52
Zack Williams045b63d2019-01-22 16:30:57 -070053 log.info("Connected to Kafka: %s" % kafka_producer)
Zack Williams73a12852018-09-05 15:33:35 -070054
Zack Williams045b63d2019-01-22 16:30:57 -070055 except confluent_kafka.KafkaError as e:
Zack Williams73a12852018-09-05 15:33:35 -070056 log.exception("Kafka Error: %s" % e)
57
58 @classmethod
59 def produce(cls, topic, key, value):
60
61 try:
62 kafka_producer.produce(
Zack Williams045b63d2019-01-22 16:30:57 -070063 topic, value, key, callback=cls._kafka_delivery_callback
64 )
Zack Williams73a12852018-09-05 15:33:35 -070065
Zack Williamsc2b87bf2018-09-21 13:48:40 -070066 # see https://github.com/confluentinc/confluent-kafka-python/issues/16
67 kafka_producer.poll(0)
68
Zack Williams045b63d2019-01-22 16:30:57 -070069 except confluent_kafka.KafkaError as err:
Zack Williams73a12852018-09-05 15:33:35 -070070 log.exception("Kafka Error", err)
71
Zack Williamsc2b87bf2018-09-21 13:48:40 -070072 def __del__(self):
Zack Williams045b63d2019-01-22 16:30:57 -070073 if kafka_producer is not None:
Zack Williamsc2b87bf2018-09-21 13:48:40 -070074 kafka_producer.flush()
75
Zack Williams73a12852018-09-05 15:33:35 -070076 @staticmethod
77 def _kafka_delivery_callback(err, msg):
78 if err:
Zack Williams045b63d2019-01-22 16:30:57 -070079 log.error("Message failed delivery: %s" % err)
Zack Williams73a12852018-09-05 15:33:35 -070080 else:
Zack Williamsda69db22019-01-29 16:44:52 -070081 pass