blob: 778486bb8a9b5e6d6a8af7553126be6c7851f6d7 [file] [log] [blame]
rdudyalab086cf32016-08-11 00:07:45 -04001#
2# Copyright 2015 Cisco Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import json
17
18import kafka
19from kafka import TopicPartition
20from oslo_config import cfg
21from oslo_utils import netutils
22from six.moves.urllib import parse as urlparse
23import logging as LOG
24
25
26class KafkaBrokerPublisher():
27 def __init__(self, parsed_url):
28 self.kafka_client = None
29 self.kafka_server = None
30
31 self.host, self.port = netutils.parse_host_port(
32 parsed_url.netloc, default_port=9092)
33
34 self.local_queue = []
35
36 params = urlparse.parse_qs(parsed_url.query)
37 self.topic = params.get('topic', ['ceilometer'])[-1]
38 self.policy = params.get('policy', ['default'])[-1]
39 self.max_queue_length = int(params.get(
40 'max_queue_length', [1024])[-1])
41 self.max_retry = int(params.get('max_retry', [100])[-1])
42
43 if self.policy in ['default', 'drop', 'queue']:
44 LOG.info(('Publishing policy set to %s') % self.policy)
45 else:
46 LOG.warn(('Publishing policy is unknown (%s) force to default')
47 % self.policy)
48 self.policy = 'default'
49
50 try:
51 self._get_client()
52 self._get_server()
53 except Exception as e:
54 LOG.exception("Failed to connect to Kafka service: %s", e)
55
56 def publish_samples(self, context, samples):
57 """Send a metering message for kafka broker.
58
59 :param context: Execution context from the service or RPC call
60 :param samples: Samples from pipeline after transformation
61 """
62 samples_list = [
63 utils.meter_message_from_counter(
64 sample, cfg.CONF.publisher.telemetry_secret)
65 for sample in samples
66 ]
67
68 self.local_queue.append(samples_list)
69
70 try:
71 self._check_kafka_connection()
72 except Exception as e:
73 raise e
74
75 self.flush()
76
77 def flush(self):
78 queue = self.local_queue
79 self.local_queue = self._process_queue(queue)
80 if self.policy == 'queue':
81 self._check_queue_length()
82
83 def publish_events(self, context, events):
84 """Send an event message for kafka broker.
85
86 :param context: Execution context from the service or RPC call
87 :param events: events from pipeline after transformation
88 """
89 events_list = [utils.message_from_event(
90 event, cfg.CONF.publisher.telemetry_secret) for event in events]
91
92 self.local_queue.append(events_list)
93
94 try:
95 self._check_kafka_connection()
96 except Exception as e:
97 raise e
98
99 self.flush()
100
101 def _process_queue(self, queue):
102 current_retry = 0
103 while queue:
104 data = queue[0]
105 try:
106 self._send(data)
107 except Exception:
108 LOG.warn(("Failed to publish %d datum"),
109 sum([len(d) for d in queue]))
110 if self.policy == 'queue':
111 return queue
112 elif self.policy == 'drop':
113 return []
114 current_retry += 1
115 if current_retry >= self.max_retry:
116 self.local_queue = []
117 LOG.exception(("Failed to retry to send sample data "
118 "with max_retry times"))
119 raise
120 else:
121 queue.pop(0)
122 return []
123
124 def _check_queue_length(self):
125 queue_length = len(self.local_queue)
126 if queue_length > self.max_queue_length > 0:
127 diff = queue_length - self.max_queue_length
128 self.local_queue = self.local_queue[diff:]
129 LOG.warn(("Kafka Publisher max local queue length is exceeded, "
130 "dropping %d oldest data") % diff)
131
132 def _check_kafka_connection(self):
133 try:
134 self._get_client()
135 except Exception as e:
136 LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
137
138 if self.policy == 'queue':
139 self._check_queue_length()
140 else:
141 self.local_queue = []
142 raise Exception('Kafka Client is not available, '
143 'please restart Kafka client')
144
145 def _get_client(self):
146 if not self.kafka_client:
147 self.kafka_client = kafka.KafkaClient(
148 "%s:%s" % (self.host, self.port))
149 self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
150
151 def _get_server(self):
152 if not self.kafka_server:
153 self.kafka_server = kafka.KafkaClient(
154 "%s:%s" % (self.host, self.port))
155 #self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
156 self.kafka_consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (self.host,self.port)])
157 self.kafka_consumer.assign([TopicPartition(self.topic,0)])
158 self.kafka_consumer.seek_to_end()
159
160 def _send(self, data):
161 #for d in data:
162 try:
163 self.kafka_producer.send_messages(
164 self.topic, json.dumps(data))
165 except Exception as e:
166 LOG.exception(("Failed to send sample data: %s"), e)
167 raise