Consul registration and kv interface added
Additional changes:
- Reworked command line parsing
- Added cleaner default handling, also reading certain environmen vars
- Many new options
- Docker compose file to start a consul + fluentd + voltha combo for
testing purposes
diff --git a/.gitignore b/.gitignore
index 0f7b7b3..e380d44 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,8 @@
# Ansible
ansible/*.retry
+# Virtualenv
+venv
+
+# Temp data dir for testing
+tmp
diff --git a/BUILD.md b/BUILD.md
index 1bb4537..96c7d8b 100644
--- a/BUILD.md
+++ b/BUILD.md
@@ -11,6 +11,8 @@
* Git client
* Working installation of Vagrant
+* jq -- a useful command line too to work with JSON data. On the MAC, you can install jq with ```brew install jq```; on Ubuntu you can do it with ```sudo apt-get install jq```. You will not regret it.
+
### Build
@@ -23,13 +25,13 @@
make
```
-The above has generated a new Docker image '''cord/voltha''' inside the VM:
+The above has generated a new Docker image '''cord/voltha''' inside the VM. To see it, run:
```
docker images
```
-### Run in stand-alone (solo) mode
+### Run in stand-alone mode
The simplest way to run the image (in the foreground):
@@ -37,6 +39,58 @@
docker run -ti cord/voltha
```
+Unless you happen to have a consul agent running on your local system, you shall see that voltha is trying to connect to a consul agent, without success.
+
+To bring up a consul agent, you can use docker-compose with the provided compose file:
+
+```
+docker-compose -f compose/docker-compose-system-test.yml up -d consul
+```
+
+This should have started a consul docker container:
+
+```
+docker-compose -f compose/docker-compose-system-test.yml ps
+```
+
+The above should list the consul conatiner.
+
+To verify that consul is indeed up, you can point your web browser to [http://localhost:8500/ui](http://localhost:8500/ui).
+Alternatively, you can use curl to access consul's REST API. For example:
+
+```
+curl -s http://localhost:8500/v1/status/leader | jq -r .
+```
+
+This should print the IP address (on the docker network) and port number of the internal gossip API for our consul instance (you can ignore the actual data).
+
+Once consul is up, you can extract its IP address programmatically by:
+
+```
+CONSUL_IP=`docker inspect compose_consul_1 | \
+ jq -r '.[0].NetworkSettings.Networks.compose_default.IPAddress'`
+```
+
+With the IP address in hand, you can now start Voltha manually as:
+
+```
+docker run -ti --rm --net=compose_default cord/voltha /voltha/main.py --consul=$CONSUL_IP:8500
+```
+
+This time it should successfully connect to consul and actually register itself.
+You should see a log line simialr to the following:
+
+```
+<timestamp> INFO coordinator.register {name: voltha-1, address: localhost, event: registered-with-consul}
+```
+
+To test Voltha's actual service registration with consul, run this (in another terminal):
+
+```
+curl -s http://localhost:8500/v1/catalog/service/voltha-1 | jq -r .
+```
+
+
## Building natively on MAC OS X
For advanced developers this may provide a more comfortable developer
diff --git a/Makefile b/Makefile
index fa2c0bd..4d586e2 100644
--- a/Makefile
+++ b/Makefile
@@ -40,8 +40,9 @@
find voltha -name '*.pyc' | xargs rm -f
fetch:
- docker pull consul
- docker pull fluent/fluentd
+ docker pull consul:latest
+ docker pull fluent/fluentd:latest
+ docker pull gliderlabs/registrator:latest
purge-venv:
rm -fr ${VENVDIR}
diff --git a/ansible/roles/common/defaults/main.yml b/ansible/roles/common/defaults/main.yml
index d044b7b..12c1af7 100644
--- a/ansible/roles/common/defaults/main.yml
+++ b/ansible/roles/common/defaults/main.yml
@@ -9,6 +9,7 @@
- libssl-dev
- libffi-dev
- python-virtualenv
+ - jq
obsolete_services:
- puppet
diff --git a/compose/README.md b/compose/README.md
new file mode 100644
index 0000000..0d65563
--- /dev/null
+++ b/compose/README.md
@@ -0,0 +1 @@
+Various docker-compose files to assist in system testing.
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
new file mode 100644
index 0000000..1f4f745
--- /dev/null
+++ b/compose/docker-compose-system-test.yml
@@ -0,0 +1,57 @@
+version: '2'
+services:
+ #
+ # Single-node consul agent
+ #
+ consul:
+ image: consul:latest
+ command: agent -server -bootstrap -client 0.0.0.0 -ui
+ ports:
+ - "8300:8300"
+ - "8400:8400"
+ - "8500:8500"
+ - "8600:53/udp"
+ #
+ # Registrator
+ #
+ registrator:
+ image: gliderlabs/registrator:latest
+ command: -ip=10.0.2.15 -retry-attempts 100 consul://consul:8500
+ links:
+ - consul
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ #
+ # Fluentd log server
+ #
+ fluentd:
+ image: fluent/fluentd
+ ports:
+ - "24224:24224"
+ volumes:
+ - "/tmp/fluentd:/fluentd/log"
+ #
+ # Single-node voltha
+ #
+ voltha:
+ image: cord/voltha
+ command: /voltha/main.py -v --consul=consul:8500 --fluentd=fluentd:24224
+ depends_on:
+ - consul
+ links:
+ - consul
+ - fluentd
+ #
+ # Test container to see services available
+ #
+ test:
+ image: busybox
+ command: tail -F /dev/null
+ depends_on:
+ - consul
+ - voltha
+ links:
+ - consul
+ - registrator
+ - fluentd
+ - voltha
diff --git a/voltha/asleep.py b/voltha/asleep.py
new file mode 100644
index 0000000..e1868ab
--- /dev/null
+++ b/voltha/asleep.py
@@ -0,0 +1,27 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Async sleep (asleep) method and other twisted goodies """
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+
+
+def asleep(dt):
+ assert isinstance(dt, (int, float))
+ d = Deferred()
+ reactor.callLater(dt, lambda: d.callback(None))
+ return d
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
new file mode 100644
index 0000000..e893d96
--- /dev/null
+++ b/voltha/coordinator.py
@@ -0,0 +1,116 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Consul-based coordinator services """
+
+from consul import Consul, ConsulException
+from requests import ConnectionError
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+
+from asleep import asleep
+
+
+class Coordinator(object):
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def __init__(self, internal_host_address, external_host_address, instance_id, consul='localhost:8500'):
+
+ self.retries = 0
+ self.instance_id = instance_id
+ self.internal_host_address = internal_host_address
+ self.external_host_address = external_host_address
+
+ self.log = get_logger()
+ self.log.info('initializing-coordinator')
+
+ host = consul.split(':')[0].strip()
+ port = int(consul.split(':')[1].strip())
+ self.consul = Consul(host=host, port=port) # TODO need to handle reconnect events properly
+
+ reactor.callLater(0, self.async_init)
+ self.log.info('initialized-coordinator')
+
+ @inlineCallbacks
+ def async_init(self):
+ yield self.kv_put('voltha/instances/%s/status' % self.instance_id, 'up')
+ yield self.register()
+
+ def backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries, len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ self.log.error(msg + ', retrying in %s second(s)' % wait_time)
+ return asleep(wait_time)
+
+ def clear_backoff(self):
+ if self.retries:
+ self.log.info('Reconnected to consul agent after %d retries' % self.retries)
+ self.retries = 0
+
+ @inlineCallbacks
+ def kv_put(self, key, value, retry=True):
+ while 1:
+ try:
+ self.consul.kv.put(key, value)
+ self.clear_backoff()
+ break
+ except ConsulException, e:
+ if retry:
+ yield self.backoff('Consul not yet up')
+ else:
+ raise e
+ except ConnectionError, e:
+ if retry:
+ yield self.backoff('Cannot connect to consul agent')
+ else:
+ raise e
+ except Exception, e:
+ self.log.exception(e)
+ if retry:
+ yield self.backoff('Unknown error')
+ else:
+ raise e
+
+ @inlineCallbacks
+ def register(self, retry=True):
+ while 1:
+ try:
+ kw = dict(
+ name='voltha-%s' % self.instance_id,
+ address=self.internal_host_address,
+ )
+ self.consul.agent.service.register(**kw)
+ self.log.info('registered-with-consul', **kw)
+ break
+ except ConsulException, e:
+ if retry:
+ yield self.backoff('Consul not yet up')
+ else:
+ raise e
+ except ConnectionError, e:
+ if retry:
+ yield self.backoff('Cannot connect to consul agent')
+ else:
+ raise e
+ except Exception, e:
+ self.log.exception(e)
+ if retry:
+ yield self.backoff('Unknown error')
+ else:
+ raise e
diff --git a/voltha/main.py b/voltha/main.py
index 9f03cf0..200043c 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -19,29 +19,73 @@
import argparse
import os
-
+import time
import yaml
-from structlog_setup import setup_logging, DEFAULT_FLUENT_SERVER
+from structlog_setup import setup_logging
+from coordinator import Coordinator
+
+
+defs = dict(
+ consul=os.environ.get('CONSUL', 'localhost:8500'),
+ instance_id=os.environ.get('INSTANCE_ID', '1'),
+ config=os.environ.get('CONFIG', './voltha.yml'),
+ interface=os.environ.get('INTERFACE', 'eth0'),
+ internal_host_address=os.environ.get('INTERNAL_HOST_ADDRESS', 'localhost'),
+ external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS', 'localhost'),
+ fluentd=os.environ.get('FLUENTD', None)
+)
def parse_args():
parser = argparse.ArgumentParser()
- # generic parameters
- parser.add_argument('--no-banner', dest='no_banner', action='store_true', default=False,
+ parser.add_argument('-c', '--config', dest='config', action='store',
+ default=defs['config'],
+ help='Path to voltha.yml config file (default: %s). '
+ 'If relative, it is relative to main.py of voltha.' % defs['config'])
+
+ parser.add_argument('-C', '--consul', dest='consul', action='store',
+ default=defs['consul'],
+ help='<hostname>:<port> to consul agent (default: %s)' % defs['consul'])
+
+ parser.add_argument('-E', '--external-host-address', dest='external_host_address', action='store',
+ default=defs['external_host_address'],
+ help='<hostname> or <ip> at which Voltha is reachable from outside the cluster'
+ '(default: %s)' % defs['external_host_address'])
+
+ parser.add_argument('-F', '--fluentd', dest='fluentd', action='store',
+ default=defs['fluentd'],
+ help='<hostname>:<port> to fluentd server (default: %s).'
+ '(If not specified (None), the address from the config file is used'
+ % defs['fluentd'])
+
+ parser.add_argument('-H', '--internal-host-address', dest='internal_host_address', action='store',
+ default=defs['internal_host_address'],
+ help='<hostname> or <ip> at which Voltha is reachable from inside the cluster'
+ '(default: %s)' % defs['internal_host_address'])
+
+ parser.add_argument('-i', '--instance-id', dest='instance_id', action='store',
+ default=defs['instance_id'],
+ help='unique string id of this voltha instance (default: %s)' % defs['interface'])
+
+ # TODO placeholder, not used yet
+ parser.add_argument('-I', '--interface', dest='interface', action='store',
+ default=defs['interface'],
+ help='ETH interface to send (default: %s)' % defs['interface'])
+
+ parser.add_argument('-n', '--no-banner', dest='no_banner', action='store_true', default=False,
help='omit startup banner log lines')
- parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', default=False,
- help='enable verbose logging')
+
+ parser.add_argument('-N', '--no-heartbeat', dest='no_heartbeat', action='store_true', default=False,
+ help='do not emit periodic heartbeat log messages')
+
parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False,
help="suppress debug and info logs")
- parser.add_argument('-c', '--config', dest='config', action='store', default='./voltha.yml',
- help='Path to voltha.yml config file. If relative, it is relative to main.py of voltha')
- # placeholder
- parser.add_argument('-i', '--interface', dest='interface', action='store', default='eth0',
- help='ETH interface to send (default: eth0)')
+ parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', default=False,
+ help='enable verbose logging')
return parser.parse_args()
@@ -66,6 +110,16 @@
log.info('(to stop: press Ctrl-C)')
+def startup(log, args, config):
+ log.info('starting-internal-services')
+ coordinator = Coordinator(
+ internal_host_address=args.internal_host_address,
+ external_host_address=args.external_host_address,
+ instance_id=args.instance_id,
+ consul=args.consul)
+ log.info('started-internal-services')
+
+
def cleanup(log):
"""Execute before the reactor is shut down"""
log.info('exiting-on-keyboard-interrupt')
@@ -78,12 +132,35 @@
reactor.run()
+def start_heartbeat(log):
+
+ t0 = time.time()
+ t0s = time.ctime(t0)
+
+ def heartbeat():
+ log.info(status='up', since=t0s, uptime=time.time() - t0)
+
+ from twisted.internet.task import LoopingCall
+ lc = LoopingCall(heartbeat)
+ lc.start(10)
+
+
def main():
+
args = parse_args()
+
config = load_config(args)
- log = setup_logging(config.get('logging', {}))
+
+ log = setup_logging(config.get('logging', {}), fluentd=args.fluentd)
+
if not args.no_banner:
print_banner(args, log)
+
+ if not args.no_heartbeat:
+ start_heartbeat(log)
+
+ startup(log, args, config)
+
start_reactor(args, log) # will not return except Keyboard interrupt
diff --git a/voltha/structlog_setup.py b/voltha/structlog_setup.py
index 2382c0e..e153106 100644
--- a/voltha/structlog_setup.py
+++ b/voltha/structlog_setup.py
@@ -60,7 +60,7 @@
del _repr_running[call_key]
-def setup_logging(log_config):
+def setup_logging(log_config, fluentd=None):
"""
Set up logging such that:
- The primary logging entry method is structlog (see http://structlog.readthedocs.io/en/stable/index.html)
@@ -74,6 +74,18 @@
event_dict['exc_info'] = True
return event_dict
+ # if fluentd is specified, we need to override the config data with its host and port info
+ if fluentd is not None:
+ fluentd_host = fluentd.split(':')[0].strip()
+ fluentd_port = int(fluentd.split(':')[1].strip())
+ handlers = log_config.get('handlers', None)
+ if isinstance(handlers, dict):
+ for _, defs in handlers.iteritems():
+ if isinstance(defs, dict):
+ if defs.get('class', '').endswith('FluentHandler'):
+ defs['host'] = fluentd_host
+ defs['port'] = fluentd_port
+
# Configure standard logging
logging.config.dictConfig(log_config)