Membership ephemeral entry and initial leader election
Also:
- env.sh to load what's needed for dev
- refactored main
- async consul client instead of blocking
diff --git a/Dockerfile b/Dockerfile
index 1a9ecc4..03537b8 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -27,9 +27,9 @@
COPY requirements.txt /tmp/requirements.txt
# Install app dependencies
-RUN apk add build-base gcc abuild binutils python-dev libffi-dev openssl-dev && \
+RUN apk add build-base gcc abuild binutils python-dev libffi-dev openssl-dev git && \
pip install -r /tmp/requirements.txt && \
- apk del --purge build-base gcc abuild binutils python-dev libffi-dev openssl-dev
+ apk del --purge build-base gcc abuild binutils python-dev libffi-dev openssl-dev git
# Bundle app source
COPY voltha /voltha
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index d06aea3..3f7fb0f 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -10,9 +10,9 @@
- "8300:8300"
- "8400:8400"
- "8500:8500"
- - "8600:53/udp"
+ - "8600:8600/udp"
environment:
- SERVICE_53_IGNORE: "yes"
+ #SERVICE_53_IGNORE: "yes"
SERVICE_8300_IGNORE: "yes"
SERVICE_8400_IGNORE: "yes"
SERVICE_8500_NAME: "consul-rest"
@@ -21,7 +21,12 @@
#
registrator:
image: gliderlabs/registrator:latest
- command: -ip=10.0.2.15 -retry-attempts 100 -internal consul://consul:8500
+ command: [
+ "-ip=${DOCKER_HOST_IP}",
+ "-retry-attempts", "100",
+ # "-internal",
+ "consul://consul:8500"
+ ]
links:
- consul
volumes:
@@ -35,6 +40,8 @@
- "24224:24224"
volumes:
- "/tmp/fluentd:/fluentd/log"
+ environment:
+ SERVICE_24224_NAME: "fluentd-intake"
#
# Single-node voltha
#
@@ -45,7 +52,9 @@
"-v",
"--consul=consul:8500",
"--fluentd=fluentd:24224",
- "--rest-port=8880"
+ "--rest-port=8880",
+ "--instance-id-is-container-name",
+ "-v"
]
ports:
- 8880
@@ -59,17 +68,20 @@
SERVICE_8880_CHECK_HTTP: "/health"
SERVICE_8880_CHECK_INTERVAL: "5s"
SERVICE_8880_CHECK_TIMEOUT: "1s"
+ IMAGE_NAME: "{{DCOKER_IMAGE_NAME}}"
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
#
# Test container to see services available
#
- test:
- image: busybox
- command: tail -F /dev/null
- depends_on:
- - consul
- - voltha
- links:
- - consul
- - registrator
- - fluentd
- - voltha
+# test:
+# image: busybox
+# command: tail -F /dev/null
+# depends_on:
+# - consul
+# - voltha
+# links:
+# - consul
+# - registrator
+# - fluentd
+# - voltha
diff --git a/env.sh b/env.sh
new file mode 100644
index 0000000..19c99fa
--- /dev/null
+++ b/env.sh
@@ -0,0 +1,9 @@
+# sourcing this file is needed to make local development and integration testing work
+
+# load local python virtualenv
+. venv/bin/activate
+
+
+# assign DOCKER_HOST_IP to be the main ip address of this host
+export DOCKER_HOST_IP=$(python voltha/nethelpers.py)
+
diff --git a/requirements.txt b/requirements.txt
index 3d312e2..d3643a6 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,7 @@
argparse==1.2.1
colorama>=0.2.5
decorator>=3.4.0
+docker-py
flake8>=2.5.1
fluent-logger>=0.4.3
klein>=15.3.1
@@ -12,7 +13,6 @@
pyflakes>=1.0.0
pylint>=1.5.2
pyOpenSSL>=0.13
-python-consul>=0.6.1
PyYAML>=3.10
requests
scapy>=2.3.2
@@ -20,5 +20,10 @@
simplejson>=3.8.1
six>=1.10.0
structlog>=16.1.0
+treq>=15.1.0
Twisted>=13.2.0
urllib3>=1.7.1
+
+# python-consul>=0.6.1 we need the pre-released version for now, because 0.6.1 does not
+# yet support Twisted. Once this is released, it will be the 0.6.2 version
+git+git://github.com/cablehead/python-consul.git
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 1fa2b18..bd55049 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -23,11 +23,13 @@
# was cut before twisted support was added. So keep an eye on when 0.6.2 comes out and
# move over to the twisted interface once it's available.
-from consul import Consul, ConsulException
+from consul import Check, ConsulException
+from consul.twisted import Consul
from requests import ConnectionError
from structlog import get_logger
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.task import LoopingCall
from asleep import asleep
@@ -50,6 +52,9 @@
self.external_host_address = external_host_address
self.rest_port = rest_port
+ self.session_id = None
+ self.i_am_leader = False
+
self.log = get_logger()
self.log.info('initializing-coordinator')
@@ -61,9 +66,15 @@
self.log.info('initialized-coordinator')
@inlineCallbacks
+ def shutdown(self):
+ yield self.delete_session()
+
+ @inlineCallbacks
def async_init(self):
- yield self.kv_put('voltha/instances/%s/status' % self.instance_id, 'up')
- yield self.register()
+ # yield self.kv_put('voltha/instances/%s/status' % self.instance_id, 'up')
+ yield self.create_session()
+ yield self.create_membership_record()
+ yield self.elect_leader()
def backoff(self, msg):
wait_time = self.RETRY_BACKOFF[min(self.retries, len(self.RETRY_BACKOFF) - 1)]
@@ -80,9 +91,10 @@
def kv_put(self, key, value, retry=True):
while 1:
try:
- self.consul.kv.put(key, value)
+ response = yield self.consul.kv.put(key, value)
self.clear_backoff()
- break
+ returnValue(response)
+
except ConsulException, e:
if retry:
yield self.backoff('Consul not yet up')
@@ -101,30 +113,67 @@
raise e
@inlineCallbacks
- def register(self, retry=True):
+ def create_session(self):
+
+ @inlineCallbacks
+ def _renew_session():
+ try:
+ result = yield self.consul.session.renew(session_id=self.session_id)
+ self.log.debug('just renewed session', result=result)
+ except Exception, e:
+ self.log.exception('could-not-renew-session', e=e)
+
+ @inlineCallbacks
+ def _create_session():
+
+ # create consul session
+ self.session_id = yield self.consul.session.create(behavior='delete', ttl=10)
+ self.log.info('created-consul-session', session_id=self.session_id)
+
+ # start renewing session it 3 times within the ttl
+ lc = LoopingCall(_renew_session)
+ lc.start(3)
+
+ yield self._retry(_create_session)
+
+ @inlineCallbacks
+ def delete_session(self):
+ yield self.consul.session.destroy(self.session_id)
+
+ @inlineCallbacks
+ def create_membership_record(self):
+ # create ephemeral k/v registering this instance in the service/voltha/members/<instance-id> node
+ result = yield self.consul.kv.put('service/voltha/members/%s' % self.instance_id, 'alive',
+ acquire=self.session_id)
+ assert result is True
+
+ @inlineCallbacks
+ def elect_leader(self):
+ """Attempt to become the leader by acquiring the leader key and track the leader anyway"""
+
+ # attempt acquire leader lock
+ result = yield self.consul.kv.put('service/voltha/leader', self.instance_id,
+ acquire=self.session_id)
+
+ # read it back before being too happy; seeing our session id is a proof and now we have
+ # the change id that we can use to reliably track any changes
+
+ # TODO continue from here !!!
+ if result is True:
+ self.i_am_leader = True
+
+ @inlineCallbacks
+ def _retry(self, func, *args, **kw):
while 1:
try:
- kw = dict(
- name='voltha-%s' % self.instance_id,
- address=self.internal_host_address,
- port=self.rest_port
- )
- self.consul.agent.service.register(**kw)
- self.log.info('registered-with-consul', **kw)
+ result = yield func(*args, **kw)
break
except ConsulException, e:
- if retry:
- yield self.backoff('Consul not yet up')
- else:
- raise e
+ yield self.backoff('Consul not yet up')
except ConnectionError, e:
- if retry:
- yield self.backoff('Cannot connect to consul agent')
- else:
- raise e
+ yield self.backoff('Cannot connect to consul agent')
except Exception, e:
self.log.exception(e)
- if retry:
- yield self.backoff('Unknown error')
- else:
- raise e
+ yield self.backoff('Unknown error')
+
+ returnValue(result)
diff --git a/voltha/dockerhelpers.py b/voltha/dockerhelpers.py
new file mode 100644
index 0000000..ee6cd7a
--- /dev/null
+++ b/voltha/dockerhelpers.py
@@ -0,0 +1,49 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some docker related convenience functions
+"""
+
+import os
+from structlog import get_logger
+
+from docker import Client
+
+
+log = get_logger()
+
+
+def get_my_containers_name():
+ """
+ Return the docker containers name in which this process is running.
+ To look up the container name, we use the container ID extracted from the $HOSTNAME
+ environment variable (which is set by docker conventions).
+ :return: String with the docker container name (or None if any issue is encountered)
+ """
+ my_container_id = os.environ.get('HOSTNAME', None)
+
+ try:
+ docker_cli = Client(base_url='unix://tmp/docker.sock')
+ info = docker_cli.inspect_container(my_container_id)
+
+ except Exception, e:
+ log.exception('failed', my_container_id=my_container_id, e=e)
+ raise
+
+ name = info['Name'].lstrip('/')
+
+ return name
diff --git a/voltha/main.py b/voltha/main.py
index 188e6aa..a85fb2f 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -21,11 +21,14 @@
import os
import time
import yaml
+from twisted.internet.defer import inlineCallbacks
from structlog_setup import setup_logging
from coordinator import Coordinator
from northbound.rest.health_check import init_rest_service
from nethelpers import get_my_primary_interface, get_my_primary_local_ipv4
+from dockerhelpers import get_my_containers_name
+
defs = dict(
consul=os.environ.get('CONSUL', 'localhost:8500'),
@@ -93,7 +96,18 @@
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', default=False,
help='enable verbose logging')
- return parser.parse_args()
+ parser.add_argument('--instance-id-is-container-name', dest='instance_id_is_container_name', action='store_true',
+ default=False, help='use docker container name as voltha instance id'
+ ' (overrides -i/--instance-id option)')
+
+ args = parser.parse_args()
+
+ # post-processing
+
+ if args.instance_id_is_container_name:
+ args.instance_id = get_my_containers_name()
+
+ return args
def load_config(args):
@@ -107,7 +121,7 @@
return config
-def print_banner(args, log):
+def print_banner(log):
log.info(' _ ______ __ ________ _____ ')
log.info('| | / / __ \/ / /_ __/ / / / |')
log.info('| | / / / / / / / / / /_/ / /| |')
@@ -116,61 +130,62 @@
log.info('(to stop: press Ctrl-C)')
-def startup(log, args, config):
- log.info('starting-internal-services')
- coordinator = Coordinator(
- internal_host_address=args.internal_host_address,
- external_host_address=args.external_host_address,
- rest_port=args.rest_port,
- instance_id=args.instance_id,
- consul=args.consul)
- init_rest_service(args.rest_port)
- log.info('started-internal-services')
+class Main(object):
+ def __init__(self):
+ self.args = args = parse_args()
+ self.config = load_config(args)
+ self.log = setup_logging(self.config.get('logging', {}), args.instance_id, fluentd=args.fluentd)
-def cleanup(log):
- """Execute before the reactor is shut down"""
- log.info('exiting-on-keyboard-interrupt')
+ # components
+ self.coordinator = None
+ if not args.no_banner:
+ print_banner(self.log)
-def start_reactor(args, log):
- from twisted.internet import reactor
- reactor.callWhenRunning(lambda: log.info('twisted-reactor-started'))
- reactor.addSystemEventTrigger('before', 'shutdown', lambda: cleanup(log))
- reactor.run()
+ if not args.no_heartbeat:
+ self.start_heartbeat()
+ self.startup_components()
-def start_heartbeat(log):
+ def start(self):
+ self.start_reactor() # will not return except Keyboard interrupt
- t0 = time.time()
- t0s = time.ctime(t0)
+ def startup_components(self):
+ self.log.info('starting-internal-components')
+ self.coordinator = Coordinator(
+ internal_host_address=self.args.internal_host_address,
+ external_host_address=self.args.external_host_address,
+ rest_port=self.args.rest_port,
+ instance_id=self.args.instance_id,
+ consul=self.args.consul)
+ init_rest_service(self.args.rest_port)
+ self.log.info('started-internal-services')
- def heartbeat():
- log.info(status='up', since=t0s, uptime=time.time() - t0)
+ @inlineCallbacks
+ def shutdown_components(self):
+ """Execute before the reactor is shut down"""
+ self.log.info('exiting-on-keyboard-interrupt')
+ yield self.coordinator.shutdown()
- from twisted.internet.task import LoopingCall
- lc = LoopingCall(heartbeat)
- lc.start(10)
+ def start_reactor(self):
+ from twisted.internet import reactor
+ reactor.callWhenRunning(lambda: self.log.info('twisted-reactor-started'))
+ reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown_components)
+ reactor.run()
+ def start_heartbeat(self):
-def main():
+ t0 = time.time()
+ t0s = time.ctime(t0)
- args = parse_args()
+ def heartbeat():
+ self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
- config = load_config(args)
-
- log = setup_logging(config.get('logging', {}), fluentd=args.fluentd)
-
- if not args.no_banner:
- print_banner(args, log)
-
- if not args.no_heartbeat:
- start_heartbeat(log)
-
- startup(log, args, config)
-
- start_reactor(args, log) # will not return except Keyboard interrupt
+ from twisted.internet.task import LoopingCall
+ lc = LoopingCall(heartbeat)
+ lc.start(10)
if __name__ == '__main__':
- main()
+ Main().start()
diff --git a/voltha/nethelpers.py b/voltha/nethelpers.py
index 09f73c1..2fa7bd5 100644
--- a/voltha/nethelpers.py
+++ b/voltha/nethelpers.py
@@ -39,3 +39,7 @@
addresses = ni.ifaddresses(iface_name)
ipv4 = addresses[AF_INET][0]['addr']
return ipv4
+
+
+if __name__ == '__main__':
+ print get_my_primary_local_ipv4()
diff --git a/voltha/northbound/rest/health_check.py b/voltha/northbound/rest/health_check.py
index ea5bff9..5038e30 100644
--- a/voltha/northbound/rest/health_check.py
+++ b/voltha/northbound/rest/health_check.py
@@ -18,6 +18,7 @@
""" Rest API to check health of Voltha instance """
from klein import Klein
+from structlog import get_logger
from twisted.internet import endpoints
from twisted.internet import reactor
from twisted.web.server import Site
@@ -26,10 +27,12 @@
class HealthCheck(object):
app = Klein()
+ log = get_logger()
@app.route('/health')
def health_check(self, request):
# TODO this is just a placeholder, very crude health check
+ self.log.debug("health-check-received")
return '{"status": "ok"}'
def get_site(self):
diff --git a/voltha/structlog_setup.py b/voltha/structlog_setup.py
index 53bfc72..299b0ac 100644
--- a/voltha/structlog_setup.py
+++ b/voltha/structlog_setup.py
@@ -57,7 +57,7 @@
del _repr_running[call_key]
-def setup_logging(log_config, fluentd=None):
+def setup_logging(log_config, instance_id, fluentd=None):
"""
Set up logging such that:
- The primary logging entry method is structlog (see http://structlog.readthedocs.io/en/stable/index.html)
@@ -66,11 +66,14 @@
bridge to a fluent logging agent.
"""
- def add_exc_info_flag_for_exception(logger, name, event_dict):
+ def add_exc_info_flag_for_exception(_, name, event_dict):
if name == 'exception':
event_dict['exc_info'] = True
return event_dict
+ def add_instance_id(_, __, event_dict):
+ event_dict['instance_id'] = instance_id
+ return event_dict
# if fluentd is specified, we need to override the config data with its host and port info
if fluentd is not None:
fluentd_host = fluentd.split(':')[0].strip()
@@ -90,6 +93,7 @@
add_exc_info_flag_for_exception,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
+ add_instance_id,
FluentRenderer(),
]
structlog.configure(logger_factory=structlog.stdlib.LoggerFactory(),