#!/usr/bin/env python

"""
A simple process to read time-series samples from a kafka topic and shove
the data into graphite/carbon as pickled input.

The code is based on a github/gist by phobos182
(https://gist.github.com/phobos182/3931936).

As all GitHib gists, it is covered by the MIT license.

"""

from optparse import OptionParser

import simplejson
import structlog
from kafka import KafkaConsumer
import pickle
import struct
import socket
import sys
import time

from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError

from common.utils.consulhelpers import get_endpoint_from_consul


log = structlog.get_logger()


class Graphite:

    def __init__(self, host='localhost', port=2004, retry=5, delay=3,
                 backoff=2, timeout=10):
        self.host = host
        self.port = port
        self.retry = retry
        self.delay = delay
        self.backoff = backoff
        self.timeout = timeout

        # Create initial socket
        self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.conn.settimeout(self.timeout)
        # Initiate connection
        self.connect()


    def _backoff(self, retry, delay, backoff):
        """Exponential backoff."""
        retry -= 1
        if retry == 0:
            raise Exception('Timeout')
        time.sleep(delay)
        delay *= backoff
        return retry, delay, backoff


    def _retry(self, exception, func, *args):
        """
        Retry calling the func catching a tuple of exceptions with backoff.
        """
        retry = self.retry
        delay = self.delay
        backoff = self.backoff
        while retry > 0:
            try:
                return func(*args)
            except exception, e:
                retry, delay, backoff = self._backoff(retry, delay, backoff)


    def connect(self):
        """Connect to graphite."""
        retry = self.retry
        backoff = self.backoff
        delay = self.delay
        while retry > 0:
            try:
                # Attempt to connect to Graphite, break if success
                self.conn.connect((self.host, self.port))
                break
            except socket.error, e:
                # Ditch this socket. Create a new one
                self.conn.close()
                self.conn.connect()
                retry, delay, backoff = self._backoff(retry, delay, backoff)


    def close(self):
        """Close connection go Graphite."""
        self.conn.close()


    def send(self, data, retry=3):
        """Send data to graphite."""
        retry = self.retry
        backoff = self.backoff
        delay = self.delay
        # Attempt to send any data in the queue
        while retry > 0:
            # Check socket
            if not self.conn:
                # Attempt to restablish connection
                self.close()
                self.connect()
                retry, delay, backoff = self._backoff(retry, delay, backoff)
                continue
            try:
                # Send data to socket
                self.conn.sendall(data)
                break
            except socket.error, e:
                self.close()
                self.connect()
                retry, delay, backoff = self._backoff(retry, delay, backoff)
                continue


def _pickle(batch):
    """Pickle metrics into graphite format."""
    payload = pickle.dumps(batch)
    header = struct.pack("!L", len(payload))
    message = header + payload
    return message


def _convert(msg):
    """Convert a graphite key value string to pickle."""

    def extract_slice(ts, data):
        for object_path, metrics in data.iteritems():
            for metric_name, value in metrics.iteritems():
                path = '.'.join((object_path, metric_name))
                yield (path, ts, value)

    assert isinstance(msg, dict)
    type = msg.get('type')
    if type == 'slice':
        extractor, kw = extract_slice, dict(ts=msg['ts'], data=msg['data'])
    else:
        raise Exception('Unknown format')

    batch = []
    for path, timestamp, value in extractor(**kw):
        batch.append((path, (timestamp, value)))
    return batch


if __name__ == "__main__":

    parser = OptionParser()
    parser.add_option("-K", "--kafka", dest="kafka",
                      default="localhost:9092", help="Kafka bootstrap server")
    parser.add_option("-c", "--consul", dest="consul",
                      default="localhost:8500",
                      help="Consul server (needed if kafak server is specifed"
                           "with '@kafka' value)")
    parser.add_option("-t", "--topic", dest="topic", help="Kafka topic")
    parser.add_option("-H", "--host", dest="graphite_host",
                      default="localhost", help="Graphite host")
    parser.add_option("-p", "--port", dest="graphite_port", type=int,
                      default=2004, help="Graphite port")

    (options, args) = parser.parse_args()

    # Assign OptParse variables
    kafka = options.kafka
    consul = options.consul
    topic = options.topic
    host = options.graphite_host
    port = options.graphite_port

    # Connect to Graphite
    try:
        graphite = Graphite(host, port)
    except socket.error, e:
        print "Could not connect to graphite host %s:%s" % (host, port)
        sys.exit(1)
    except socket.gaierror, e:
        print "Invalid hostname for graphite host %s" % (host)
        sys.exit(1)
    log.info('Connected to graphite at {}:{}'.format(host, port))

    # Resolve Kafka value if it is based on consul lookup
    if kafka.startswith('@'):
        kafka = get_endpoint_from_consul(consul, kafka[1:])

    # Connect to Kafka
    try:
        log.info('connect-to-kafka', kafka=kafka)
        consumer = KafkaConsumer(topic, bootstrap_servers=kafka)
    except KafkaError, e:
        log.error('failed-to-connect-to-kafka', kafka=kafka, e=e)
        sys.exit(1)

    # Consume Kafka topic
    log.info('start-loop', topic=topic)
    for record in consumer:
        assert isinstance(record, ConsumerRecord)
        msg = record.value

        try:
            batch = _convert(simplejson.loads(msg))
        except Exception, e:
            log.warn('unknown-format', msg=msg)
            continue

        pickled = _pickle(batch)
        graphite.send(pickled)
        log.debug('sent', batch_len=len(batch))

    log.info('exited')
