Initial KPI/PM support
Added a tiny program (and container) to shovel KPI
data from Kafka to graphite using carbon pickle
format. The utility is called 'shovel'. It is dockerized.
Reorganized Dockerfiles in their own dir to start cleaning
up top-level dir of Voltha.
A 3rd-party grafana/graphite container is added to the
system test ensamble, launched by docker-compose. With
the new shovel, this implements a KPI/PM metric store
with a very nice Web UI (grafana).
Finalized internal sample format and extended the new
diagnostics module to publish 2 initial metrics to
Kafka, which now nicely shows up via both kafkacat
and grafana.
The infrastructure is ready for arbitrary metrics now.
This commit accidentally picked up some ongoing change
on the Tibit integation side, but it is too complex
to untangle, so I leave it in; Nathan will push his
latest Tibit adapter code in the next 24h.
Change-Id: I6812dd5b198fef5cb19f111111111113fba8b625
diff --git a/Makefile b/Makefile
index a25668b..00fecd9 100644
--- a/Makefile
+++ b/Makefile
@@ -75,13 +75,14 @@
@echo
build: protos docker-base
- docker build -t cord/voltha -f Dockerfile.voltha .
- docker build -t cord/chameleon -f Dockerfile.chameleon .
- docker build -t cord/ofagent -f Dockerfile.ofagent .
- docker build -t cord/podder -f Dockerfile.podder .
+ docker build -t cord/voltha -f docker/Dockerfile.voltha .
+ docker build -t cord/chameleon -f docker/Dockerfile.chameleon .
+ docker build -t cord/ofagent -f docker/Dockerfile.ofagent .
+ docker build -t cord/podder -f docker/Dockerfile.podder .
+ docker build -t cord/shovel -f docker/Dockerfile.shovel .
docker-base:
- docker build -t cord/voltha-base -f Dockerfile.base .
+ docker build -t cord/voltha-base -f docker/Dockerfile.base .
protos:
make -C voltha/protos
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index e35d006..87d42ff 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -57,6 +57,7 @@
- consul
volumes:
- "/var/run/docker.sock:/tmp/docker.sock"
+
#
# Fluentd log server
#
@@ -79,6 +80,42 @@
restart: unless-stopped
#
+ # Graphite-Grafana-statsd service instance
+ # (demo place-holder for external KPI system)
+ #
+ grafana:
+ image: kamon/grafana_graphite
+ ports:
+ - "8882:80"
+ - "2003:2003"
+ - "2004:2004"
+ - "8126:8126"
+ - "8125:8125/udp"
+ environment:
+ SERVICE_80_NAME: "grafana-web-ui"
+ SERVICE_2003_NAME: "carbon-plain-text-intake"
+ SERVICE_2004_NAME: "carbon-pickle-intake"
+ SERVICE_8126_NAME: "statsd-tcp-intake"
+ SERVICE_8125_NAME: "statsd-udp-intake"
+
+ #
+ # Shovel (Kafka-graphite-gateway)
+ #
+ shovel:
+ image: cord/shovel
+ command: [
+ "/shovel/shovel/main.py",
+ "--kafka=@kafka",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--topic=voltha.kpi",
+ "--host=${DOCKER_HOST_IP}"
+ ]
+ depends_on:
+ - consul
+ - kafka
+ - grafana
+
+ #
# Voltha server instance(s)
#
voltha:
diff --git a/Dockerfile.base b/docker/Dockerfile.base
similarity index 100%
rename from Dockerfile.base
rename to docker/Dockerfile.base
diff --git a/Dockerfile.base.alpine b/docker/Dockerfile.base.alpine
similarity index 100%
rename from Dockerfile.base.alpine
rename to docker/Dockerfile.base.alpine
diff --git a/Dockerfile.chameleon b/docker/Dockerfile.chameleon
similarity index 100%
rename from Dockerfile.chameleon
rename to docker/Dockerfile.chameleon
diff --git a/Dockerfile.ofagent b/docker/Dockerfile.ofagent
similarity index 100%
rename from Dockerfile.ofagent
rename to docker/Dockerfile.ofagent
diff --git a/Dockerfile.podder b/docker/Dockerfile.podder
similarity index 100%
rename from Dockerfile.podder
rename to docker/Dockerfile.podder
diff --git a/docker/Dockerfile.shovel b/docker/Dockerfile.shovel
new file mode 100644
index 0000000..4ae1808
--- /dev/null
+++ b/docker/Dockerfile.shovel
@@ -0,0 +1,31 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM cord/voltha-base
+
+MAINTAINER Zsolt Haraszti <zharaszt@ciena.com>
+MAINTAINER Ali Al-Shabibi <ali.al-shabibi@onlab.us>
+MAINTAINER Nathan Knuth <nathan.knuth@tibitcom.com>
+
+# Bundle app source
+RUN mkdir /shovel && touch /shovel/__init__.py
+ENV PYTHONPATH=/shovel
+COPY common /shovel/common
+COPY shovel /shovel/shovel
+
+# Exposing process and default entry point
+CMD ["python", "shovel/shovel/main.py"]
diff --git a/Dockerfile.voltha b/docker/Dockerfile.voltha
similarity index 100%
rename from Dockerfile.voltha
rename to docker/Dockerfile.voltha
diff --git a/requirements.txt b/requirements.txt
index 8f7d6a7..3c660d7 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,5 @@
argparse==1.2.1
+arrow>=0.10.0
colorama>=0.2.5
cython==0.24.1
decorator>=3.4.0
@@ -11,6 +12,7 @@
hexdump>=3.3
jinja2>=2.8
jsonpatch>=1.14
+kafka_python>=1.3.1
klein>=15.3.1
networkx>=1.11
nose>=1.3.7
diff --git a/shovel/main.py b/shovel/main.py
new file mode 100755
index 0000000..60a9d62
--- /dev/null
+++ b/shovel/main.py
@@ -0,0 +1,209 @@
+#!/usr/bin/env python
+
+"""
+A simple process to read time-series samples from a kafka topic and shove
+the data into graphite/carbon as pickled input.
+
+The code is based on a github/gist by phobos182
+(https://gist.github.com/phobos182/3931936).
+
+As all GitHib gists, it is covered by the MIT license.
+
+"""
+
+from optparse import OptionParser
+
+import simplejson
+from kafka import KafkaConsumer
+import pickle
+import struct
+import socket
+import sys
+import time
+
+from kafka.consumer.fetcher import ConsumerRecord
+from kafka.errors import KafkaError
+
+from common.utils.consulhelpers import get_endpoint_from_consul
+
+
+class Graphite:
+
+ def __init__(self, host='localhost', port=2004, retry=5, delay=3,
+ backoff=2, timeout=10):
+ self.host = host
+ self.port = port
+ self.retry = retry
+ self.delay = delay
+ self.backoff = backoff
+ self.timeout = timeout
+
+ # Create initial socket
+ self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.conn.settimeout(self.timeout)
+ # Initiate connection
+ self.connect()
+
+
+ def _backoff(self, retry, delay, backoff):
+ """Exponential backoff."""
+ retry -= 1
+ if retry == 0:
+ raise Exception('Timeout')
+ time.sleep(delay)
+ delay *= backoff
+ return retry, delay, backoff
+
+
+ def _retry(self, exception, func, *args):
+ """
+ Retry calling the func catching a tuple of exceptions with backoff.
+ """
+ retry = self.retry
+ delay = self.delay
+ backoff = self.backoff
+ while retry > 0:
+ try:
+ return func(*args)
+ except exception, e:
+ retry, delay, backoff = self._backoff(retry, delay, backoff)
+
+
+ def connect(self):
+ """Connect to graphite."""
+ retry = self.retry
+ backoff = self.backoff
+ delay = self.delay
+ while retry > 0:
+ try:
+ # Attempt to connect to Graphite, break if success
+ self.conn.connect((self.host, self.port))
+ break
+ except socket.error, e:
+ # Ditch this socket. Create a new one
+ self.conn.close()
+ self.conn.connect()
+ retry, delay, backoff = self._backoff(retry, delay, backoff)
+
+
+ def close(self):
+ """Close connection go Graphite."""
+ self.conn.close()
+
+
+ def send(self, data, retry=3):
+ """Send data to graphite."""
+ retry = self.retry
+ backoff = self.backoff
+ delay = self.delay
+ # Attempt to send any data in the queue
+ while retry > 0:
+ # Check socket
+ if not self.conn:
+ # Attempt to restablish connection
+ self.close()
+ self.connect()
+ retry, delay, backoff = self._backoff(retry, delay, backoff)
+ continue
+ try:
+ # Send data to socket
+ self.conn.sendall(data)
+ break
+ except socket.error, e:
+ self.close()
+ self.connect()
+ retry, delay, backoff = self._backoff(retry, delay, backoff)
+ continue
+
+
+def _pickle(batch):
+ """Pickle metrics into graphite format."""
+ payload = pickle.dumps(batch)
+ header = struct.pack("!L", len(payload))
+ message = header + payload
+ return message
+
+
+def _convert(msg):
+ """Convert a graphite key value string to pickle."""
+
+ def extract_slice(ts, data):
+ for object_path, metrics in data.iteritems():
+ for metric_name, value in metrics.iteritems():
+ path = '.'.join((object_path, metric_name))
+ yield (path, ts, value)
+
+ assert isinstance(msg, dict)
+ type = msg.get('type')
+ if type == 'slice':
+ extractor, kw = extract_slice, dict(ts=msg['ts'], data=msg['data'])
+ else:
+ raise Exception('Unknown format')
+
+ batch = []
+ for path, timestamp, value in extractor(**kw):
+ batch.append((path, (timestamp, value)))
+ return batch
+
+
+if __name__ == "__main__":
+
+ parser = OptionParser()
+ parser.add_option("-K", "--kafka", dest="kafka",
+ default="localhost:9092", help="Kafka bootstrap server")
+ parser.add_option("-c", "--consul", dest="consul",
+ default="localhost:8500",
+ help="Consul server (needed if kafak server is specifed"
+ "with '@kafka' value)")
+ parser.add_option("-t", "--topic", dest="topic", help="Kafka topic")
+ parser.add_option("-H", "--host", dest="graphite_host",
+ default="localhost", help="Graphite host")
+ parser.add_option("-p", "--port", dest="graphite_port", type=int,
+ default=2004, help="Graphite port")
+
+ (options, args) = parser.parse_args()
+
+ # Assign OptParse variables
+ kafka = options.kafka
+ consul = options.consul
+ topic = options.topic
+ host = options.graphite_host
+ port = options.graphite_port
+
+ # Connect to Graphite
+ try:
+ graphite = Graphite(host, port)
+ except socket.error, e:
+ print "Could not connect to graphite host %s:%s" % (host, port)
+ sys.exit(1)
+ except socket.gaierror, e:
+ print "Invalid hostname for graphite host %s" % (host)
+ sys.exit(1)
+
+ # Resolve Kafka value if it is based on consul lookup
+ if kafka.startswith('@'):
+ kafka = get_endpoint_from_consul(consul, kafka[1:])
+
+ # Connect to Kafka
+ try:
+ print 'Connecting to Kafka at {}'.format(kafka)
+ consumer = KafkaConsumer(topic, bootstrap_servers=kafka)
+ except KafkaError, e:
+ print "Could not connect to kafka bootstrap server {}: {}".format(
+ kafka, e)
+ sys.exit(1)
+
+ # Consume Kafka topic
+ for record in consumer:
+ assert isinstance(record, ConsumerRecord)
+ msg = record.value
+
+ try:
+ batch = _convert(simplejson.loads(msg))
+ except Exception, e:
+ print "Unknown format, could not extract data: {}".format(msg)
+ continue
+
+ pickled = _pickle(batch)
+ graphite.send(pickled)
+ print "Sent %s metrics to Graphite" % (len(batch))
diff --git a/voltha/adapters/tibit_olt/README.md b/voltha/adapters/tibit_olt/README.md
index c9c429e..a454b1b 100644
--- a/voltha/adapters/tibit_olt/README.md
+++ b/voltha/adapters/tibit_olt/README.md
@@ -102,6 +102,18 @@
"ACTIVATING"
"ENABLED"
```
+When the device is ACTIVE, the logical devices and logical ports should be created. To check
+the logical devices and logical ports, use the following commands.
+
+```
+curl -s http://localhost:8881/api/v1/local/logical_devices | jq '.'
+# Note: Need to pull out logical device id.
+curl -s http://localhost:8881/api/v1/local/logical_devices/47d2bb42a2c6/ports | jq '.'
+```
+
+
+
+
# OLD stuff
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index a1f9417..4a0a323 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -29,11 +29,19 @@
from common.frameio.frameio import BpfProgramFilter
from voltha.registry import registry
from voltha.adapters.interface import IAdapterInterface
+from voltha.core.logical_device_agent import mac_str_to_tuple
from voltha.protos.adapter_pb2 import Adapter, AdapterConfig
from voltha.protos.device_pb2 import DeviceType, DeviceTypes
from voltha.protos.health_pb2 import HealthStatus
from voltha.protos.common_pb2 import LogLevel, ConnectStatus
+from voltha.protos.common_pb2 import OperStatus, AdminState
+
+from voltha.protos.logical_device_pb2 import LogicalDevice, LogicalPort
+from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_port, OFPPF_10GB_FD, \
+ OFPPF_FIBER, OFPPS_LIVE, ofp_switch_features, OFPC_PORT_STATS, \
+ OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
+
from scapy.packet import Packet, bind_layers
from scapy.fields import StrField
@@ -134,8 +142,6 @@
# if we got response, we can fill out the device info, mark the device
# reachable
- import pdb
- pdb.set_trace()
device.root = True
device.vendor = 'Tibit stuff'
@@ -147,6 +153,77 @@
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
+ # then shortly after we create some ports for the device
+ log.info('create-port')
+ nni_port = Port(
+ port_no=2,
+ label='NNI facing Ethernet port',
+ type=Port.ETHERNET_NNI,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ )
+ self.adapter_agent.add_port(device.id, nni_port)
+ self.adapter_agent.add_port(device.id, Port(
+ port_no=1,
+ label='PON port',
+ type=Port.PON_OLT,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ ))
+
+ log.info('create-logical-device')
+ # then shortly after we create the logical device with one port
+ # that will correspond to the NNI port
+ logical_device_id = uuid4().hex[:12]
+ ld = LogicalDevice(
+ id=logical_device_id,
+ datapath_id=int('0x' + logical_device_id[:8], 16), # from id
+ desc=ofp_desc(
+ mfr_desc=device.vendor,
+ hw_desc=jdev['results']['device'],
+ sw_desc=jdev['results']['firmware'],
+ serial_num=uuid4().hex,
+ dp_desc='n/a'
+ ),
+ switch_features=ofp_switch_features(
+ n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=( # TODO and ditto
+ OFPC_FLOW_STATS
+ | OFPC_TABLE_STATS
+ | OFPC_PORT_STATS
+ | OFPC_GROUP_STATS
+ )
+ ),
+ root_device_id=device.id
+ )
+ self.adapter_agent.create_logical_device(ld)
+ cap = OFPPF_10GB_FD | OFPPF_FIBER
+ self.adapter_agent.add_logical_port(ld.id, LogicalPort(
+ id='nni',
+ ofp_port=ofp_port(
+ port_no=129,
+ hw_addr=mac_str_to_tuple(device.mac_address),
+ name='nni',
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_10GB_FD,
+ max_speed=OFPPF_10GB_FD
+ ),
+ device_id=device.id,
+ device_port_no=nni_port.port_no,
+ root_port=True
+ ))
+
+ # and finally update to active
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = ld.id
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+
def _rcv_io(self, port, frame):
log.info('frame-recieved')
@@ -187,3 +264,4 @@
def receive_proxied_message(self, proxy_address, msg):
raise NotImplementedError()
+
diff --git a/voltha/northbound/diagnostics.py b/voltha/northbound/diagnostics.py
index 1395d2d..ee377dd 100644
--- a/voltha/northbound/diagnostics.py
+++ b/voltha/northbound/diagnostics.py
@@ -18,14 +18,14 @@
"""
Voltha internal diagnostics
"""
-import structlog
-import time
+import arrow
import gc
+import structlog
+import resource
from simplejson import dumps
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
-from twisted.internet import reactor
from zope.interface import implementer
from common.event_bus import EventBusClient
@@ -39,7 +39,8 @@
def __init__(self, config):
self.config = config
- self.periodic_check_interval = config.get('periodic_check_interval', 15)
+ self.periodic_check_interval = config.get(
+ 'periodic_check_interval', 15)
self.periodic_checks = None
self.event_bus = EventBusClient()
self.instance_id = registry('main').get_args().instance_id
@@ -59,17 +60,23 @@
def run_periodic_checks(self):
- ts = time.time(), # TODO switch to '2016.12.10T10:12:32Z' format?
+ ts = arrow.utcnow().timestamp
- backlog = dict(
+ def deferreds():
+ return len(gc.get_referrers(Deferred))
+
+ def rss_mb():
+ return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024/1024
+
+ data = dict(
+ type='slice',
ts=ts,
- object='voltha.{}'.format(self.instance_id),
- metric='internal-backlog',
- value=len(gc.get_referrers(Deferred))
+ data={
+ 'voltha.internal.{}'.format(self.instance_id): {
+ 'deferreds': deferreds(),
+ 'rss-mb': rss_mb(),
+ }
+ }
)
-
- self.event_bus.publish('kpis', dumps(backlog))
-
+ self.event_bus.publish('kpis', dumps(data))
log.debug('periodic-check', ts=ts)
-
- Deferred()