blob: 5d4681049f643af66934bf925085b3c6226c6b75 [file] [log] [blame]
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -08001#!/usr/bin/env python
2
3"""
4A simple process to read time-series samples from a kafka topic and shove
5the data into graphite/carbon as pickled input.
6
7The code is based on a github/gist by phobos182
8(https://gist.github.com/phobos182/3931936).
9
10As all GitHib gists, it is covered by the MIT license.
11
12"""
13
14from optparse import OptionParser
15
16import simplejson
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -080017import structlog
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -080018from kafka import KafkaConsumer
19import pickle
20import struct
21import socket
22import sys
23import time
24
25from kafka.consumer.fetcher import ConsumerRecord
26from kafka.errors import KafkaError
27
28from common.utils.consulhelpers import get_endpoint_from_consul
29
30
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -080031log = structlog.get_logger()
32
33
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -080034class Graphite:
35
36 def __init__(self, host='localhost', port=2004, retry=5, delay=3,
37 backoff=2, timeout=10):
38 self.host = host
39 self.port = port
40 self.retry = retry
41 self.delay = delay
42 self.backoff = backoff
43 self.timeout = timeout
44
45 # Create initial socket
46 self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
47 self.conn.settimeout(self.timeout)
48 # Initiate connection
49 self.connect()
50
51
52 def _backoff(self, retry, delay, backoff):
53 """Exponential backoff."""
54 retry -= 1
55 if retry == 0:
56 raise Exception('Timeout')
57 time.sleep(delay)
58 delay *= backoff
59 return retry, delay, backoff
60
61
62 def _retry(self, exception, func, *args):
63 """
64 Retry calling the func catching a tuple of exceptions with backoff.
65 """
66 retry = self.retry
67 delay = self.delay
68 backoff = self.backoff
69 while retry > 0:
70 try:
71 return func(*args)
72 except exception, e:
73 retry, delay, backoff = self._backoff(retry, delay, backoff)
74
75
76 def connect(self):
77 """Connect to graphite."""
78 retry = self.retry
79 backoff = self.backoff
80 delay = self.delay
81 while retry > 0:
82 try:
83 # Attempt to connect to Graphite, break if success
84 self.conn.connect((self.host, self.port))
85 break
86 except socket.error, e:
87 # Ditch this socket. Create a new one
88 self.conn.close()
89 self.conn.connect()
90 retry, delay, backoff = self._backoff(retry, delay, backoff)
91
92
93 def close(self):
94 """Close connection go Graphite."""
95 self.conn.close()
96
97
98 def send(self, data, retry=3):
99 """Send data to graphite."""
100 retry = self.retry
101 backoff = self.backoff
102 delay = self.delay
103 # Attempt to send any data in the queue
104 while retry > 0:
105 # Check socket
106 if not self.conn:
107 # Attempt to restablish connection
108 self.close()
109 self.connect()
110 retry, delay, backoff = self._backoff(retry, delay, backoff)
111 continue
112 try:
113 # Send data to socket
114 self.conn.sendall(data)
115 break
116 except socket.error, e:
117 self.close()
118 self.connect()
119 retry, delay, backoff = self._backoff(retry, delay, backoff)
120 continue
121
122
123def _pickle(batch):
124 """Pickle metrics into graphite format."""
125 payload = pickle.dumps(batch)
126 header = struct.pack("!L", len(payload))
127 message = header + payload
128 return message
129
130
131def _convert(msg):
132 """Convert a graphite key value string to pickle."""
133
Zsolt Haraszti0778a242017-01-18 01:11:54 -0800134 def extract_slice(ts, prefixes):
135 for object_path, metrics in prefixes.iteritems():
136 for metric_name, value in metrics['metrics'].iteritems():
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800137 path = '.'.join((object_path, metric_name))
138 yield (path, ts, value)
139
140 assert isinstance(msg, dict)
141 type = msg.get('type')
142 if type == 'slice':
Zsolt Haraszti0778a242017-01-18 01:11:54 -0800143 extractor, kw = extract_slice, dict(ts=msg['ts'],
144 prefixes=msg['prefixes'])
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800145 else:
146 raise Exception('Unknown format')
147
148 batch = []
149 for path, timestamp, value in extractor(**kw):
150 batch.append((path, (timestamp, value)))
151 return batch
152
153
154if __name__ == "__main__":
155
156 parser = OptionParser()
157 parser.add_option("-K", "--kafka", dest="kafka",
158 default="localhost:9092", help="Kafka bootstrap server")
159 parser.add_option("-c", "--consul", dest="consul",
160 default="localhost:8500",
161 help="Consul server (needed if kafak server is specifed"
162 "with '@kafka' value)")
163 parser.add_option("-t", "--topic", dest="topic", help="Kafka topic")
164 parser.add_option("-H", "--host", dest="graphite_host",
165 default="localhost", help="Graphite host")
166 parser.add_option("-p", "--port", dest="graphite_port", type=int,
167 default=2004, help="Graphite port")
168
169 (options, args) = parser.parse_args()
170
171 # Assign OptParse variables
172 kafka = options.kafka
173 consul = options.consul
174 topic = options.topic
175 host = options.graphite_host
176 port = options.graphite_port
177
178 # Connect to Graphite
179 try:
180 graphite = Graphite(host, port)
181 except socket.error, e:
182 print "Could not connect to graphite host %s:%s" % (host, port)
183 sys.exit(1)
184 except socket.gaierror, e:
185 print "Invalid hostname for graphite host %s" % (host)
186 sys.exit(1)
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800187 log.info('Connected to graphite at {}:{}'.format(host, port))
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800188
189 # Resolve Kafka value if it is based on consul lookup
190 if kafka.startswith('@'):
191 kafka = get_endpoint_from_consul(consul, kafka[1:])
192
193 # Connect to Kafka
194 try:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800195 log.info('connect-to-kafka', kafka=kafka)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800196 consumer = KafkaConsumer(topic, bootstrap_servers=kafka)
197 except KafkaError, e:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800198 log.error('failed-to-connect-to-kafka', kafka=kafka, e=e)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800199 sys.exit(1)
200
201 # Consume Kafka topic
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800202 log.info('start-loop', topic=topic)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800203 for record in consumer:
204 assert isinstance(record, ConsumerRecord)
205 msg = record.value
206
207 try:
208 batch = _convert(simplejson.loads(msg))
209 except Exception, e:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800210 log.warn('unknown-format', msg=msg)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800211 continue
212
213 pickled = _pickle(batch)
214 graphite.send(pickled)
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800215 log.debug('sent', batch_len=len(batch))
216
217 log.info('exited')