blob: 3f616a3396883f8e92af0d5d63ceaa309aeae65f [file] [log] [blame]
Matteo Scandoloeb0d11c2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
rdudyalab086cf32016-08-11 00:07:45 -040017#
18# Copyright 2015 Cisco Inc.
19#
20# Licensed under the Apache License, Version 2.0 (the "License"); you may
21# not use this file except in compliance with the License. You may obtain
22# a copy of the License at
23#
24# http://www.apache.org/licenses/LICENSE-2.0
25#
26# Unless required by applicable law or agreed to in writing, software
27# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
28# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
29# License for the specific language governing permissions and limitations
30# under the License.
31
32import json
33
34import kafka
35from kafka import TopicPartition
36from oslo_config import cfg
37from oslo_utils import netutils
38from six.moves.urllib import parse as urlparse
39import logging as LOG
40
41
42class KafkaBrokerPublisher():
43 def __init__(self, parsed_url):
44 self.kafka_client = None
45 self.kafka_server = None
46
47 self.host, self.port = netutils.parse_host_port(
48 parsed_url.netloc, default_port=9092)
49
50 self.local_queue = []
51
52 params = urlparse.parse_qs(parsed_url.query)
53 self.topic = params.get('topic', ['ceilometer'])[-1]
54 self.policy = params.get('policy', ['default'])[-1]
55 self.max_queue_length = int(params.get(
56 'max_queue_length', [1024])[-1])
57 self.max_retry = int(params.get('max_retry', [100])[-1])
58
59 if self.policy in ['default', 'drop', 'queue']:
60 LOG.info(('Publishing policy set to %s') % self.policy)
61 else:
62 LOG.warn(('Publishing policy is unknown (%s) force to default')
63 % self.policy)
64 self.policy = 'default'
65
66 try:
67 self._get_client()
68 self._get_server()
69 except Exception as e:
70 LOG.exception("Failed to connect to Kafka service: %s", e)
71
72 def publish_samples(self, context, samples):
73 """Send a metering message for kafka broker.
74
75 :param context: Execution context from the service or RPC call
76 :param samples: Samples from pipeline after transformation
77 """
78 samples_list = [
79 utils.meter_message_from_counter(
80 sample, cfg.CONF.publisher.telemetry_secret)
81 for sample in samples
82 ]
83
84 self.local_queue.append(samples_list)
85
86 try:
87 self._check_kafka_connection()
88 except Exception as e:
89 raise e
90
91 self.flush()
92
93 def flush(self):
94 queue = self.local_queue
95 self.local_queue = self._process_queue(queue)
96 if self.policy == 'queue':
97 self._check_queue_length()
98
99 def publish_events(self, context, events):
100 """Send an event message for kafka broker.
101
102 :param context: Execution context from the service or RPC call
103 :param events: events from pipeline after transformation
104 """
105 events_list = [utils.message_from_event(
106 event, cfg.CONF.publisher.telemetry_secret) for event in events]
107
108 self.local_queue.append(events_list)
109
110 try:
111 self._check_kafka_connection()
112 except Exception as e:
113 raise e
114
115 self.flush()
116
117 def _process_queue(self, queue):
118 current_retry = 0
119 while queue:
120 data = queue[0]
121 try:
122 self._send(data)
123 except Exception:
124 LOG.warn(("Failed to publish %d datum"),
125 sum([len(d) for d in queue]))
126 if self.policy == 'queue':
127 return queue
128 elif self.policy == 'drop':
129 return []
130 current_retry += 1
131 if current_retry >= self.max_retry:
132 self.local_queue = []
133 LOG.exception(("Failed to retry to send sample data "
134 "with max_retry times"))
135 raise
136 else:
137 queue.pop(0)
138 return []
139
140 def _check_queue_length(self):
141 queue_length = len(self.local_queue)
142 if queue_length > self.max_queue_length > 0:
143 diff = queue_length - self.max_queue_length
144 self.local_queue = self.local_queue[diff:]
145 LOG.warn(("Kafka Publisher max local queue length is exceeded, "
146 "dropping %d oldest data") % diff)
147
148 def _check_kafka_connection(self):
149 try:
150 self._get_client()
151 except Exception as e:
152 LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
153
154 if self.policy == 'queue':
155 self._check_queue_length()
156 else:
157 self.local_queue = []
158 raise Exception('Kafka Client is not available, '
159 'please restart Kafka client')
160
161 def _get_client(self):
162 if not self.kafka_client:
163 self.kafka_client = kafka.KafkaClient(
164 "%s:%s" % (self.host, self.port))
165 self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
166
167 def _get_server(self):
168 if not self.kafka_server:
169 self.kafka_server = kafka.KafkaClient(
170 "%s:%s" % (self.host, self.port))
171 #self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
172 self.kafka_consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (self.host,self.port)])
173 self.kafka_consumer.assign([TopicPartition(self.topic,0)])
174 self.kafka_consumer.seek_to_end()
175
176 def _send(self, data):
177 #for d in data:
178 try:
179 self.kafka_producer.send_messages(
180 self.topic, json.dumps(data))
181 except Exception as e:
182 LOG.exception(("Failed to send sample data: %s"), e)
183 raise