blob: ce495fc6a23b062aa61766f125e98683e4ea06a1 [file] [log] [blame]
rdudyalab086cf32016-08-11 00:07:45 -04001#
2# Copyright 2015 Cisco Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import json
17
18import kafka
19from oslo_config import cfg
20from oslo_utils import netutils
21from six.moves.urllib import parse as urlparse
22import logging as LOG
23
24
25class KafkaBrokerPublisher():
26 def __init__(self, parsed_url):
27 self.kafka_client = None
28 self.kafka_server = None
29
30 self.host, self.port = netutils.parse_host_port(
31 parsed_url.netloc, default_port=9092)
32
33 self.local_queue = []
34
35 params = urlparse.parse_qs(parsed_url.query)
36 self.topic = params.get('topic', ['ceilometer'])[-1]
37 self.policy = params.get('policy', ['default'])[-1]
38 self.max_queue_length = int(params.get(
39 'max_queue_length', [1024])[-1])
40 self.max_retry = int(params.get('max_retry', [100])[-1])
41
42 if self.policy in ['default', 'drop', 'queue']:
43 LOG.info(('Publishing policy set to %s') % self.policy)
44 else:
45 LOG.warn(('Publishing policy is unknown (%s) force to default')
46 % self.policy)
47 self.policy = 'default'
48
49 try:
50 self._get_client()
51 self._get_server()
52 except Exception as e:
53 LOG.exception("Failed to connect to Kafka service: %s", e)
54
55 def publish_samples(self, context, samples):
56 """Send a metering message for kafka broker.
57
58 :param context: Execution context from the service or RPC call
59 :param samples: Samples from pipeline after transformation
60 """
61 samples_list = [
62 utils.meter_message_from_counter(
63 sample, cfg.CONF.publisher.telemetry_secret)
64 for sample in samples
65 ]
66
67 self.local_queue.append(samples_list)
68
69 try:
70 self._check_kafka_connection()
71 except Exception as e:
72 raise e
73
74 self.flush()
75
76 def flush(self):
77 queue = self.local_queue
78 self.local_queue = self._process_queue(queue)
79 if self.policy == 'queue':
80 self._check_queue_length()
81
82 def publish_events(self, context, events):
83 """Send an event message for kafka broker.
84
85 :param context: Execution context from the service or RPC call
86 :param events: events from pipeline after transformation
87 """
88 events_list = [utils.message_from_event(
89 event, cfg.CONF.publisher.telemetry_secret) for event in events]
90
91 self.local_queue.append(events_list)
92
93 try:
94 self._check_kafka_connection()
95 except Exception as e:
96 raise e
97
98 self.flush()
99
100 def _process_queue(self, queue):
101 current_retry = 0
102 while queue:
103 data = queue[0]
104 try:
105 self._send(data)
106 except Exception:
107 LOG.warn(("Failed to publish %d datum"),
108 sum([len(d) for d in queue]))
109 if self.policy == 'queue':
110 return queue
111 elif self.policy == 'drop':
112 return []
113 current_retry += 1
114 if current_retry >= self.max_retry:
115 self.local_queue = []
116 LOG.exception(("Failed to retry to send sample data "
117 "with max_retry times"))
118 raise
119 else:
120 queue.pop(0)
121 return []
122
123 def _check_queue_length(self):
124 queue_length = len(self.local_queue)
125 if queue_length > self.max_queue_length > 0:
126 diff = queue_length - self.max_queue_length
127 self.local_queue = self.local_queue[diff:]
128 LOG.warn(("Kafka Publisher max local queue length is exceeded, "
129 "dropping %d oldest data") % diff)
130
131 def _check_kafka_connection(self):
132 try:
133 self._get_client()
134 except Exception as e:
135 LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
136
137 if self.policy == 'queue':
138 self._check_queue_length()
139 else:
140 self.local_queue = []
141 raise Exception('Kafka Client is not available, '
142 'please restart Kafka client')
143
144 def _get_client(self):
145 if not self.kafka_client:
146 self.kafka_client = kafka.KafkaClient(
147 "%s:%s" % (self.host, self.port))
148 self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
149
150 def _get_server(self):
151 if not self.kafka_server:
152 self.kafka_server = kafka.KafkaClient(
153 "%s:%s" % (self.host, self.port))
154 self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
155
156
157 def _send(self, data):
158 #for d in data:
159 try:
160 self.kafka_producer.send_messages(
161 self.topic, json.dumps(data))
162 except Exception as e:
163 LOG.exception(("Failed to send sample data: %s"), e)
164 raise