blob: 385460132fe5634665042238a2ddf19db5e1ca5a [file] [log] [blame]
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -08001#!/usr/bin/env python
2
3"""
4A simple process to read time-series samples from a kafka topic and shove
5the data into graphite/carbon as pickled input.
6
7The code is based on a github/gist by phobos182
8(https://gist.github.com/phobos182/3931936).
9
10As all GitHib gists, it is covered by the MIT license.
11
12"""
13
14from optparse import OptionParser
15
16import simplejson
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -080017import structlog
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -080018from kafka import KafkaConsumer
19import pickle
20import struct
21import socket
22import sys
23import time
24
25from kafka.consumer.fetcher import ConsumerRecord
26from kafka.errors import KafkaError
27
28from common.utils.consulhelpers import get_endpoint_from_consul
29
30
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -080031log = structlog.get_logger()
32
33
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -080034class Graphite:
35
36 def __init__(self, host='localhost', port=2004, retry=5, delay=3,
37 backoff=2, timeout=10):
38 self.host = host
39 self.port = port
40 self.retry = retry
41 self.delay = delay
42 self.backoff = backoff
43 self.timeout = timeout
44
45 # Create initial socket
46 self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
47 self.conn.settimeout(self.timeout)
48 # Initiate connection
49 self.connect()
50
51
52 def _backoff(self, retry, delay, backoff):
53 """Exponential backoff."""
54 retry -= 1
55 if retry == 0:
56 raise Exception('Timeout')
57 time.sleep(delay)
58 delay *= backoff
59 return retry, delay, backoff
60
61
62 def _retry(self, exception, func, *args):
63 """
64 Retry calling the func catching a tuple of exceptions with backoff.
65 """
66 retry = self.retry
67 delay = self.delay
68 backoff = self.backoff
69 while retry > 0:
70 try:
71 return func(*args)
72 except exception, e:
73 retry, delay, backoff = self._backoff(retry, delay, backoff)
74
75
76 def connect(self):
77 """Connect to graphite."""
78 retry = self.retry
79 backoff = self.backoff
80 delay = self.delay
81 while retry > 0:
82 try:
83 # Attempt to connect to Graphite, break if success
84 self.conn.connect((self.host, self.port))
85 break
86 except socket.error, e:
87 # Ditch this socket. Create a new one
88 self.conn.close()
89 self.conn.connect()
90 retry, delay, backoff = self._backoff(retry, delay, backoff)
91
92
93 def close(self):
94 """Close connection go Graphite."""
95 self.conn.close()
96
97
98 def send(self, data, retry=3):
99 """Send data to graphite."""
100 retry = self.retry
101 backoff = self.backoff
102 delay = self.delay
103 # Attempt to send any data in the queue
104 while retry > 0:
105 # Check socket
106 if not self.conn:
107 # Attempt to restablish connection
108 self.close()
109 self.connect()
110 retry, delay, backoff = self._backoff(retry, delay, backoff)
111 continue
112 try:
113 # Send data to socket
114 self.conn.sendall(data)
115 break
116 except socket.error, e:
117 self.close()
118 self.connect()
119 retry, delay, backoff = self._backoff(retry, delay, backoff)
120 continue
121
122
123def _pickle(batch):
124 """Pickle metrics into graphite format."""
125 payload = pickle.dumps(batch)
126 header = struct.pack("!L", len(payload))
127 message = header + payload
128 return message
129
130
131def _convert(msg):
132 """Convert a graphite key value string to pickle."""
133
134 def extract_slice(ts, data):
135 for object_path, metrics in data.iteritems():
136 for metric_name, value in metrics.iteritems():
137 path = '.'.join((object_path, metric_name))
138 yield (path, ts, value)
139
140 assert isinstance(msg, dict)
141 type = msg.get('type')
142 if type == 'slice':
143 extractor, kw = extract_slice, dict(ts=msg['ts'], data=msg['data'])
144 else:
145 raise Exception('Unknown format')
146
147 batch = []
148 for path, timestamp, value in extractor(**kw):
149 batch.append((path, (timestamp, value)))
150 return batch
151
152
153if __name__ == "__main__":
154
155 parser = OptionParser()
156 parser.add_option("-K", "--kafka", dest="kafka",
157 default="localhost:9092", help="Kafka bootstrap server")
158 parser.add_option("-c", "--consul", dest="consul",
159 default="localhost:8500",
160 help="Consul server (needed if kafak server is specifed"
161 "with '@kafka' value)")
162 parser.add_option("-t", "--topic", dest="topic", help="Kafka topic")
163 parser.add_option("-H", "--host", dest="graphite_host",
164 default="localhost", help="Graphite host")
165 parser.add_option("-p", "--port", dest="graphite_port", type=int,
166 default=2004, help="Graphite port")
167
168 (options, args) = parser.parse_args()
169
170 # Assign OptParse variables
171 kafka = options.kafka
172 consul = options.consul
173 topic = options.topic
174 host = options.graphite_host
175 port = options.graphite_port
176
177 # Connect to Graphite
178 try:
179 graphite = Graphite(host, port)
180 except socket.error, e:
181 print "Could not connect to graphite host %s:%s" % (host, port)
182 sys.exit(1)
183 except socket.gaierror, e:
184 print "Invalid hostname for graphite host %s" % (host)
185 sys.exit(1)
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800186 log.info('Connected to graphite at {}:{}'.format(host, port))
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800187
188 # Resolve Kafka value if it is based on consul lookup
189 if kafka.startswith('@'):
190 kafka = get_endpoint_from_consul(consul, kafka[1:])
191
192 # Connect to Kafka
193 try:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800194 log.info('connect-to-kafka', kafka=kafka)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800195 consumer = KafkaConsumer(topic, bootstrap_servers=kafka)
196 except KafkaError, e:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800197 log.error('failed-to-connect-to-kafka', kafka=kafka, e=e)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800198 sys.exit(1)
199
200 # Consume Kafka topic
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800201 log.info('start-loop', topic=topic)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800202 for record in consumer:
203 assert isinstance(record, ConsumerRecord)
204 msg = record.value
205
206 try:
207 batch = _convert(simplejson.loads(msg))
208 except Exception, e:
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800209 log.warn('unknown-format', msg=msg)
Zsolt Harasztiaa4626e2016-12-08 16:53:06 -0800210 continue
211
212 pickled = _pickle(batch)
213 graphite.send(pickled)
Zsolt Haraszti1b7c0362016-12-12 09:45:47 -0800214 log.debug('sent', batch_len=len(batch))
215
216 log.info('exited')