blob: f813834468b5c6ddabf6c43712fec3ae684e4d4d [file] [log] [blame]
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -08001#!/usr/bin/env python
Zack Williams41513bf2018-07-07 20:08:35 -07002# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -080015
16"""
17A simple process to read time-series samples from a kafka topic and shove
18the data into graphite/carbon as pickled input.
19
20The code is based on a github/gist by phobos182
21(https://gist.github.com/phobos182/3931936).
22
23As all GitHib gists, it is covered by the MIT license.
24
25"""
26
27from optparse import OptionParser
28
29import simplejson
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -080030import structlog
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -080031from kafka import KafkaConsumer
32import pickle
33import struct
34import socket
35import sys
36import time
37
38from kafka.consumer.fetcher import ConsumerRecord
39from kafka.errors import KafkaError
40
41from common.utils.consulhelpers import get_endpoint_from_consul
42
43
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -080044log = structlog.get_logger()
45
46
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -080047class Graphite:
48
49 def __init__(self, host='localhost', port=2004, retry=5, delay=3,
50 backoff=2, timeout=10):
51 self.host = host
52 self.port = port
53 self.retry = retry
54 self.delay = delay
55 self.backoff = backoff
56 self.timeout = timeout
57
58 # Create initial socket
59 self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
60 self.conn.settimeout(self.timeout)
61 # Initiate connection
62 self.connect()
63
64
65 def _backoff(self, retry, delay, backoff):
66 """Exponential backoff."""
67 retry -= 1
68 if retry == 0:
69 raise Exception('Timeout')
70 time.sleep(delay)
71 delay *= backoff
72 return retry, delay, backoff
73
74
75 def _retry(self, exception, func, *args):
76 """
77 Retry calling the func catching a tuple of exceptions with backoff.
78 """
79 retry = self.retry
80 delay = self.delay
81 backoff = self.backoff
82 while retry > 0:
83 try:
84 return func(*args)
85 except exception, e:
86 retry, delay, backoff = self._backoff(retry, delay, backoff)
87
88
89 def connect(self):
90 """Connect to graphite."""
91 retry = self.retry
92 backoff = self.backoff
93 delay = self.delay
94 while retry > 0:
95 try:
96 # Attempt to connect to Graphite, break if success
97 self.conn.connect((self.host, self.port))
98 break
99 except socket.error, e:
100 # Ditch this socket. Create a new one
101 self.conn.close()
102 self.conn.connect()
103 retry, delay, backoff = self._backoff(retry, delay, backoff)
104
105
106 def close(self):
107 """Close connection go Graphite."""
108 self.conn.close()
109
110
111 def send(self, data, retry=3):
112 """Send data to graphite."""
113 retry = self.retry
114 backoff = self.backoff
115 delay = self.delay
116 # Attempt to send any data in the queue
117 while retry > 0:
118 # Check socket
119 if not self.conn:
120 # Attempt to restablish connection
121 self.close()
122 self.connect()
123 retry, delay, backoff = self._backoff(retry, delay, backoff)
124 continue
125 try:
126 # Send data to socket
127 self.conn.sendall(data)
128 break
129 except socket.error, e:
130 self.close()
131 self.connect()
132 retry, delay, backoff = self._backoff(retry, delay, backoff)
133 continue
134
135
136def _pickle(batch):
137 """Pickle metrics into graphite format."""
138 payload = pickle.dumps(batch)
139 header = struct.pack("!L", len(payload))
140 message = header + payload
141 return message
142
143
144def _convert(msg):
145 """Convert a graphite key value string to pickle."""
146
Zsolt Haraszti0778a242017-01-18 01:11:54 -0800147 def extract_slice(ts, prefixes):
148 for object_path, metrics in prefixes.iteritems():
149 for metric_name, value in metrics['metrics'].iteritems():
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800150 path = '.'.join((object_path, metric_name))
151 yield (path, ts, value)
152
153 assert isinstance(msg, dict)
154 type = msg.get('type')
155 if type == 'slice':
Zsolt Haraszti0778a242017-01-18 01:11:54 -0800156 extractor, kw = extract_slice, dict(ts=msg['ts'],
157 prefixes=msg['prefixes'])
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800158 else:
159 raise Exception('Unknown format')
160
161 batch = []
162 for path, timestamp, value in extractor(**kw):
163 batch.append((path, (timestamp, value)))
164 return batch
165
166
167if __name__ == "__main__":
168
169 parser = OptionParser()
170 parser.add_option("-K", "--kafka", dest="kafka",
171 default="localhost:9092", help="Kafka bootstrap server")
172 parser.add_option("-c", "--consul", dest="consul",
173 default="localhost:8500",
174 help="Consul server (needed if kafak server is specifed"
175 "with '@kafka' value)")
176 parser.add_option("-t", "--topic", dest="topic", help="Kafka topic")
177 parser.add_option("-H", "--host", dest="graphite_host",
178 default="localhost", help="Graphite host")
179 parser.add_option("-p", "--port", dest="graphite_port", type=int,
180 default=2004, help="Graphite port")
181
182 (options, args) = parser.parse_args()
183
184 # Assign OptParse variables
185 kafka = options.kafka
186 consul = options.consul
187 topic = options.topic
188 host = options.graphite_host
189 port = options.graphite_port
190
191 # Connect to Graphite
192 try:
193 graphite = Graphite(host, port)
194 except socket.error, e:
195 print "Could not connect to graphite host %s:%s" % (host, port)
196 sys.exit(1)
197 except socket.gaierror, e:
198 print "Invalid hostname for graphite host %s" % (host)
199 sys.exit(1)
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800200 log.info('Connected to graphite at {}:{}'.format(host, port))
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800201
202 # Resolve Kafka value if it is based on consul lookup
203 if kafka.startswith('@'):
204 kafka = get_endpoint_from_consul(consul, kafka[1:])
205
206 # Connect to Kafka
207 try:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800208 log.info('connect-to-kafka', kafka=kafka)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800209 consumer = KafkaConsumer(topic, bootstrap_servers=kafka)
210 except KafkaError, e:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800211 log.error('failed-to-connect-to-kafka', kafka=kafka, e=e)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800212 sys.exit(1)
213
214 # Consume Kafka topic
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800215 log.info('start-loop', topic=topic)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800216 for record in consumer:
217 assert isinstance(record, ConsumerRecord)
218 msg = record.value
219
220 try:
221 batch = _convert(simplejson.loads(msg))
222 except Exception, e:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800223 log.warn('unknown-format', msg=msg)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800224 continue
225
226 pickled = _pickle(batch)
227 graphite.send(pickled)
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800228 log.debug('sent', batch_len=len(batch))
229
230 log.info('exited')