Initial KPI/PM support

Added a tiny program (and container) to shovel KPI
data from Kafka to graphite using carbon pickle
format. The utility is called 'shovel'. It is dockerized.

Reorganized Dockerfiles in their own dir to start cleaning
up top-level dir of Voltha.

A 3rd-party grafana/graphite container is added to the
system test ensamble, launched by docker-compose. With
the new shovel, this implements a KPI/PM metric store
with a very nice Web UI (grafana).

Finalized internal sample format and extended the new
diagnostics module to publish 2 initial metrics to
Kafka, which now nicely shows up via both kafkacat
and grafana.

The infrastructure is ready for arbitrary metrics now.

This commit accidentally picked up some ongoing change
on the Tibit integation side, but it is too complex
to untangle, so I leave it in; Nathan will push his
latest Tibit adapter code in the next 24h.

Change-Id: I6812dd5b198fef5cb19f111111111113fba8b625
diff --git a/shovel/main.py b/shovel/main.py
new file mode 100755
index 0000000..60a9d62
--- /dev/null
+++ b/shovel/main.py
@@ -0,0 +1,209 @@
+#!/usr/bin/env python
+
+"""
+A simple process to read time-series samples from a kafka topic and shove
+the data into graphite/carbon as pickled input.
+
+The code is based on a github/gist by phobos182
+(https://gist.github.com/phobos182/3931936).
+
+As all GitHib gists, it is covered by the MIT license.
+
+"""
+
+from optparse import OptionParser
+
+import simplejson
+from kafka import KafkaConsumer
+import pickle
+import struct
+import socket
+import sys
+import time
+
+from kafka.consumer.fetcher import ConsumerRecord
+from kafka.errors import KafkaError
+
+from common.utils.consulhelpers import get_endpoint_from_consul
+
+
+class Graphite:
+
+    def __init__(self, host='localhost', port=2004, retry=5, delay=3,
+                 backoff=2, timeout=10):
+        self.host = host
+        self.port = port
+        self.retry = retry
+        self.delay = delay
+        self.backoff = backoff
+        self.timeout = timeout
+
+        # Create initial socket
+        self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.conn.settimeout(self.timeout)
+        # Initiate connection
+        self.connect()
+
+
+    def _backoff(self, retry, delay, backoff):
+        """Exponential backoff."""
+        retry -= 1
+        if retry == 0:
+            raise Exception('Timeout')
+        time.sleep(delay)
+        delay *= backoff
+        return retry, delay, backoff
+
+
+    def _retry(self, exception, func, *args):
+        """
+        Retry calling the func catching a tuple of exceptions with backoff.
+        """
+        retry = self.retry
+        delay = self.delay
+        backoff = self.backoff
+        while retry > 0:
+            try:
+                return func(*args)
+            except exception, e:
+                retry, delay, backoff = self._backoff(retry, delay, backoff)
+
+
+    def connect(self):
+        """Connect to graphite."""
+        retry = self.retry
+        backoff = self.backoff
+        delay = self.delay
+        while retry > 0:
+            try:
+                # Attempt to connect to Graphite, break if success
+                self.conn.connect((self.host, self.port))
+                break
+            except socket.error, e:
+                # Ditch this socket. Create a new one
+                self.conn.close()
+                self.conn.connect()
+                retry, delay, backoff = self._backoff(retry, delay, backoff)
+
+
+    def close(self):
+        """Close connection go Graphite."""
+        self.conn.close()
+
+
+    def send(self, data, retry=3):
+        """Send data to graphite."""
+        retry = self.retry
+        backoff = self.backoff
+        delay = self.delay
+        # Attempt to send any data in the queue
+        while retry > 0:
+            # Check socket
+            if not self.conn:
+                # Attempt to restablish connection
+                self.close()
+                self.connect()
+                retry, delay, backoff = self._backoff(retry, delay, backoff)
+                continue
+            try:
+                # Send data to socket
+                self.conn.sendall(data)
+                break
+            except socket.error, e:
+                self.close()
+                self.connect()
+                retry, delay, backoff = self._backoff(retry, delay, backoff)
+                continue
+
+
+def _pickle(batch):
+    """Pickle metrics into graphite format."""
+    payload = pickle.dumps(batch)
+    header = struct.pack("!L", len(payload))
+    message = header + payload
+    return message
+
+
+def _convert(msg):
+    """Convert a graphite key value string to pickle."""
+
+    def extract_slice(ts, data):
+        for object_path, metrics in data.iteritems():
+            for metric_name, value in metrics.iteritems():
+                path = '.'.join((object_path, metric_name))
+                yield (path, ts, value)
+
+    assert isinstance(msg, dict)
+    type = msg.get('type')
+    if type == 'slice':
+        extractor, kw = extract_slice, dict(ts=msg['ts'], data=msg['data'])
+    else:
+        raise Exception('Unknown format')
+
+    batch = []
+    for path, timestamp, value in extractor(**kw):
+        batch.append((path, (timestamp, value)))
+    return batch
+
+
+if __name__ == "__main__":
+
+    parser = OptionParser()
+    parser.add_option("-K", "--kafka", dest="kafka",
+                      default="localhost:9092", help="Kafka bootstrap server")
+    parser.add_option("-c", "--consul", dest="consul",
+                      default="localhost:8500",
+                      help="Consul server (needed if kafak server is specifed"
+                           "with '@kafka' value)")
+    parser.add_option("-t", "--topic", dest="topic", help="Kafka topic")
+    parser.add_option("-H", "--host", dest="graphite_host",
+                      default="localhost", help="Graphite host")
+    parser.add_option("-p", "--port", dest="graphite_port", type=int,
+                      default=2004, help="Graphite port")
+
+    (options, args) = parser.parse_args()
+
+    # Assign OptParse variables
+    kafka = options.kafka
+    consul = options.consul
+    topic = options.topic
+    host = options.graphite_host
+    port = options.graphite_port
+
+    # Connect to Graphite
+    try:
+        graphite = Graphite(host, port)
+    except socket.error, e:
+        print "Could not connect to graphite host %s:%s" % (host, port)
+        sys.exit(1)
+    except socket.gaierror, e:
+        print "Invalid hostname for graphite host %s" % (host)
+        sys.exit(1)
+
+    # Resolve Kafka value if it is based on consul lookup
+    if kafka.startswith('@'):
+        kafka = get_endpoint_from_consul(consul, kafka[1:])
+
+    # Connect to Kafka
+    try:
+        print 'Connecting to Kafka at {}'.format(kafka)
+        consumer = KafkaConsumer(topic, bootstrap_servers=kafka)
+    except KafkaError, e:
+        print "Could not connect to kafka bootstrap server {}: {}".format(
+            kafka, e)
+        sys.exit(1)
+
+    # Consume Kafka topic
+    for record in consumer:
+        assert isinstance(record, ConsumerRecord)
+        msg = record.value
+
+        try:
+            batch = _convert(simplejson.loads(msg))
+        except Exception, e:
+            print "Unknown format, could not extract data: {}".format(msg)
+            continue
+
+        pickled = _pickle(batch)
+        graphite.send(pickled)
+        print "Sent %s metrics to Graphite" % (len(batch))