blob: f6d2a67e04c92881495b0a2d035917f9af45b412 [file] [log] [blame]
Matteo Scandoloeb0d11c2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Srikanth Vavilapalli31713602017-03-03 00:50:18 +000017#
18# Copyright 2015 Cisco Inc.
19#
20# Licensed under the Apache License, Version 2.0 (the "License"); you may
21# not use this file except in compliance with the License. You may obtain
22# a copy of the License at
23#
24# http://www.apache.org/licenses/LICENSE-2.0
25#
26# Unless required by applicable law or agreed to in writing, software
27# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
28# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
29# License for the specific language governing permissions and limitations
30# under the License.
31
32import kafka
33from oslo_log import log
34from oslo_serialization import jsonutils
35from oslo_utils import netutils
36from six.moves.urllib import parse as urlparse
37
38from ceilometer.i18n import _LE
39from ceilometer.publisher import messaging
40
41LOG = log.getLogger(__name__)
42
43
44class KafkaBrokerPublisher(messaging.MessagingPublisher):
45 """Publish metering data to kafka broker.
46
47 The ip address and port number of kafka broker should be configured in
48 ceilometer pipeline configuration file. If an ip address is not specified,
49 this kafka publisher will not publish any meters.
50
51 To enable this publisher, add the following section to the
52 /etc/ceilometer/pipeline.yaml file or simply add it to an existing
53 pipeline::
54
55 meter:
56 - name: meter_kafka
57 interval: 600
58 counters:
59 - "*"
60 transformers:
61 sinks:
62 - kafka_sink
63 sinks:
64 - name: kafka_sink
65 transformers:
66 publishers:
67 - kafka://[kafka_broker_ip]:[kafka_broker_port]?topic=[topic]
68
69 Kafka topic name and broker's port are required for this publisher to work
70 properly. If topic parameter is missing, this kafka publisher publish
71 metering data under a topic name, 'ceilometer'. If the port number is not
72 specified, this Kafka Publisher will use 9092 as the broker's port.
73 This publisher has transmit options such as queue, drop, and retry. These
74 options are specified using policy field of URL parameter. When queue
75 option could be selected, local queue length can be determined using
76 max_queue_length field as well. When the transfer fails with retry
77 option, try to resend the data as many times as specified in max_retry
78 field. If max_retry is not specified, default the number of retry is 100.
79 """
80
81 def __init__(self, parsed_url):
82 super(KafkaBrokerPublisher, self).__init__(parsed_url)
83 options = urlparse.parse_qs(parsed_url.query)
84
85 self._producer = None
86 self._host, self._port = netutils.parse_host_port(
87 parsed_url.netloc, default_port=9092)
88 self._topic = options.get('topic', ['ceilometer'])[-1]
89 self.max_retry = int(options.get('max_retry', [100])[-1])
90
91 def _ensure_connection(self):
92 if self._producer:
93 return
94
95 try:
96 self._producer = kafka.KafkaProducer(
97 bootstrap_servers=["%s:%s" % (self._host, self._port)])
98 except kafka.errors.KafkaError as e:
99 LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
100 raise messaging.DeliveryFailure('Kafka Client is not available, '
101 'please restart Kafka client')
102 except Exception as e:
103 LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
104 raise messaging.DeliveryFailure('Kafka Client is not available, '
105 'please restart Kafka client')
106
107 def _send(self, context, event_type, data):
108 self._ensure_connection()
109 # TODO(sileht): don't split the payload into multiple network
110 # message ... but how to do that without breaking consuming
111 # application...
112 try:
113 for d in data:
114 self._producer.send(self._topic, jsonutils.dumps(d))
115 except Exception as e:
116 messaging.raise_delivery_failure(e)