Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | |
| 3 | """ |
| 4 | A simple process to read time-series samples from a kafka topic and shove |
| 5 | the data into graphite/carbon as pickled input. |
| 6 | |
| 7 | The code is based on a github/gist by phobos182 |
| 8 | (https://gist.github.com/phobos182/3931936). |
| 9 | |
| 10 | As all GitHib gists, it is covered by the MIT license. |
| 11 | |
| 12 | """ |
| 13 | |
| 14 | from optparse import OptionParser |
| 15 | |
| 16 | import simplejson |
Zsolt Haraszti | 1b7c036 | 2016-12-12 09:45:47 -0800 | [diff] [blame] | 17 | import structlog |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 18 | from kafka import KafkaConsumer |
| 19 | import pickle |
| 20 | import struct |
| 21 | import socket |
| 22 | import sys |
| 23 | import time |
| 24 | |
| 25 | from kafka.consumer.fetcher import ConsumerRecord |
| 26 | from kafka.errors import KafkaError |
| 27 | |
| 28 | from common.utils.consulhelpers import get_endpoint_from_consul |
| 29 | |
| 30 | |
Zsolt Haraszti | 1b7c036 | 2016-12-12 09:45:47 -0800 | [diff] [blame] | 31 | log = structlog.get_logger() |
| 32 | |
| 33 | |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 34 | class Graphite: |
| 35 | |
| 36 | def __init__(self, host='localhost', port=2004, retry=5, delay=3, |
| 37 | backoff=2, timeout=10): |
| 38 | self.host = host |
| 39 | self.port = port |
| 40 | self.retry = retry |
| 41 | self.delay = delay |
| 42 | self.backoff = backoff |
| 43 | self.timeout = timeout |
| 44 | |
| 45 | # Create initial socket |
| 46 | self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 47 | self.conn.settimeout(self.timeout) |
| 48 | # Initiate connection |
| 49 | self.connect() |
| 50 | |
| 51 | |
| 52 | def _backoff(self, retry, delay, backoff): |
| 53 | """Exponential backoff.""" |
| 54 | retry -= 1 |
| 55 | if retry == 0: |
| 56 | raise Exception('Timeout') |
| 57 | time.sleep(delay) |
| 58 | delay *= backoff |
| 59 | return retry, delay, backoff |
| 60 | |
| 61 | |
| 62 | def _retry(self, exception, func, *args): |
| 63 | """ |
| 64 | Retry calling the func catching a tuple of exceptions with backoff. |
| 65 | """ |
| 66 | retry = self.retry |
| 67 | delay = self.delay |
| 68 | backoff = self.backoff |
| 69 | while retry > 0: |
| 70 | try: |
| 71 | return func(*args) |
| 72 | except exception, e: |
| 73 | retry, delay, backoff = self._backoff(retry, delay, backoff) |
| 74 | |
| 75 | |
| 76 | def connect(self): |
| 77 | """Connect to graphite.""" |
| 78 | retry = self.retry |
| 79 | backoff = self.backoff |
| 80 | delay = self.delay |
| 81 | while retry > 0: |
| 82 | try: |
| 83 | # Attempt to connect to Graphite, break if success |
| 84 | self.conn.connect((self.host, self.port)) |
| 85 | break |
| 86 | except socket.error, e: |
| 87 | # Ditch this socket. Create a new one |
| 88 | self.conn.close() |
| 89 | self.conn.connect() |
| 90 | retry, delay, backoff = self._backoff(retry, delay, backoff) |
| 91 | |
| 92 | |
| 93 | def close(self): |
| 94 | """Close connection go Graphite.""" |
| 95 | self.conn.close() |
| 96 | |
| 97 | |
| 98 | def send(self, data, retry=3): |
| 99 | """Send data to graphite.""" |
| 100 | retry = self.retry |
| 101 | backoff = self.backoff |
| 102 | delay = self.delay |
| 103 | # Attempt to send any data in the queue |
| 104 | while retry > 0: |
| 105 | # Check socket |
| 106 | if not self.conn: |
| 107 | # Attempt to restablish connection |
| 108 | self.close() |
| 109 | self.connect() |
| 110 | retry, delay, backoff = self._backoff(retry, delay, backoff) |
| 111 | continue |
| 112 | try: |
| 113 | # Send data to socket |
| 114 | self.conn.sendall(data) |
| 115 | break |
| 116 | except socket.error, e: |
| 117 | self.close() |
| 118 | self.connect() |
| 119 | retry, delay, backoff = self._backoff(retry, delay, backoff) |
| 120 | continue |
| 121 | |
| 122 | |
| 123 | def _pickle(batch): |
| 124 | """Pickle metrics into graphite format.""" |
| 125 | payload = pickle.dumps(batch) |
| 126 | header = struct.pack("!L", len(payload)) |
| 127 | message = header + payload |
| 128 | return message |
| 129 | |
| 130 | |
| 131 | def _convert(msg): |
| 132 | """Convert a graphite key value string to pickle.""" |
| 133 | |
Zsolt Haraszti | 0778a24 | 2017-01-18 01:11:54 -0800 | [diff] [blame] | 134 | def extract_slice(ts, prefixes): |
| 135 | for object_path, metrics in prefixes.iteritems(): |
| 136 | for metric_name, value in metrics['metrics'].iteritems(): |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 137 | path = '.'.join((object_path, metric_name)) |
| 138 | yield (path, ts, value) |
| 139 | |
| 140 | assert isinstance(msg, dict) |
| 141 | type = msg.get('type') |
| 142 | if type == 'slice': |
Zsolt Haraszti | 0778a24 | 2017-01-18 01:11:54 -0800 | [diff] [blame] | 143 | extractor, kw = extract_slice, dict(ts=msg['ts'], |
| 144 | prefixes=msg['prefixes']) |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 145 | else: |
| 146 | raise Exception('Unknown format') |
| 147 | |
| 148 | batch = [] |
| 149 | for path, timestamp, value in extractor(**kw): |
| 150 | batch.append((path, (timestamp, value))) |
| 151 | return batch |
| 152 | |
| 153 | |
| 154 | if __name__ == "__main__": |
| 155 | |
| 156 | parser = OptionParser() |
| 157 | parser.add_option("-K", "--kafka", dest="kafka", |
| 158 | default="localhost:9092", help="Kafka bootstrap server") |
| 159 | parser.add_option("-c", "--consul", dest="consul", |
| 160 | default="localhost:8500", |
| 161 | help="Consul server (needed if kafak server is specifed" |
| 162 | "with '@kafka' value)") |
| 163 | parser.add_option("-t", "--topic", dest="topic", help="Kafka topic") |
| 164 | parser.add_option("-H", "--host", dest="graphite_host", |
| 165 | default="localhost", help="Graphite host") |
| 166 | parser.add_option("-p", "--port", dest="graphite_port", type=int, |
| 167 | default=2004, help="Graphite port") |
| 168 | |
| 169 | (options, args) = parser.parse_args() |
| 170 | |
| 171 | # Assign OptParse variables |
| 172 | kafka = options.kafka |
| 173 | consul = options.consul |
| 174 | topic = options.topic |
| 175 | host = options.graphite_host |
| 176 | port = options.graphite_port |
| 177 | |
| 178 | # Connect to Graphite |
| 179 | try: |
| 180 | graphite = Graphite(host, port) |
| 181 | except socket.error, e: |
| 182 | print "Could not connect to graphite host %s:%s" % (host, port) |
| 183 | sys.exit(1) |
| 184 | except socket.gaierror, e: |
| 185 | print "Invalid hostname for graphite host %s" % (host) |
| 186 | sys.exit(1) |
Zsolt Haraszti | 1b7c036 | 2016-12-12 09:45:47 -0800 | [diff] [blame] | 187 | log.info('Connected to graphite at {}:{}'.format(host, port)) |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 188 | |
| 189 | # Resolve Kafka value if it is based on consul lookup |
| 190 | if kafka.startswith('@'): |
| 191 | kafka = get_endpoint_from_consul(consul, kafka[1:]) |
| 192 | |
| 193 | # Connect to Kafka |
| 194 | try: |
Zsolt Haraszti | 1b7c036 | 2016-12-12 09:45:47 -0800 | [diff] [blame] | 195 | log.info('connect-to-kafka', kafka=kafka) |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 196 | consumer = KafkaConsumer(topic, bootstrap_servers=kafka) |
| 197 | except KafkaError, e: |
Zsolt Haraszti | 1b7c036 | 2016-12-12 09:45:47 -0800 | [diff] [blame] | 198 | log.error('failed-to-connect-to-kafka', kafka=kafka, e=e) |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 199 | sys.exit(1) |
| 200 | |
| 201 | # Consume Kafka topic |
Zsolt Haraszti | 1b7c036 | 2016-12-12 09:45:47 -0800 | [diff] [blame] | 202 | log.info('start-loop', topic=topic) |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 203 | for record in consumer: |
| 204 | assert isinstance(record, ConsumerRecord) |
| 205 | msg = record.value |
| 206 | |
| 207 | try: |
| 208 | batch = _convert(simplejson.loads(msg)) |
| 209 | except Exception, e: |
Zsolt Haraszti | 1b7c036 | 2016-12-12 09:45:47 -0800 | [diff] [blame] | 210 | log.warn('unknown-format', msg=msg) |
Zsolt Haraszti | aa4626e | 2016-12-08 16:53:06 -0800 | [diff] [blame] | 211 | continue |
| 212 | |
| 213 | pickled = _pickle(batch) |
| 214 | graphite.send(pickled) |
Zsolt Haraszti | 1b7c036 | 2016-12-12 09:45:47 -0800 | [diff] [blame] | 215 | log.debug('sent', batch_len=len(batch)) |
| 216 | |
| 217 | log.info('exited') |